A first attempt to fix the queues in a cothreaded pipeline.

Original commit message from CVS:
A first attempt to fix the queues in a cothreaded pipeline.
Some fixes to the thread handling.
Fix a bug in gstreamer-config : gthread was not included.
gst_bin_create_plan() is now done in the READY state.
a bin with only another bin in it will now work with gst_bin_iterate.
Added some examples for the queues.
This commit is contained in:
Wim Taymans 2000-09-22 23:35:14 +00:00
parent 41ad7a209b
commit e5ab7f33ac
35 changed files with 719 additions and 135 deletions

View file

@ -50,9 +50,6 @@ int main(int argc,char *argv[])
gst_pad_connect(gst_element_get_pad(decoder,"src"), gst_pad_connect(gst_element_get_pad(decoder,"src"),
gst_element_get_pad(audiosink,"sink")); gst_element_get_pad(audiosink,"sink"));
/* find out how to handle this bin */
gst_bin_create_plan(GST_BIN(bin));
/* make it ready */ /* make it ready */
gst_element_set_state(bin, GST_STATE_READY); gst_element_set_state(bin, GST_STATE_READY);
/* start playing */ /* start playing */

View file

@ -46,9 +46,6 @@ int main(int argc,char *argv[])
exit(-1); exit(-1);
} }
/* find out how to handle this bin */
gst_bin_create_plan(GST_BIN(pipeline));
/* make it ready */ /* make it ready */
gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY); gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY);
/* start playing */ /* start playing */

2
examples/queue/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
Makefile
queue

84
examples/queue/queue.c Normal file
View file

@ -0,0 +1,84 @@
#include <gst/gst.h>
gboolean playing;
/* eos will be called when the src element has an end of stream */
void eos(GstSrc *src, gpointer data)
{
g_print("have eos, quitting\n");
playing = FALSE;
}
int main(int argc,char *argv[])
{
GstElement *disksrc, *audiosink, *parse, *decode, *queue;
GstElement *bin;
GstElement *thread;
if (argc != 2) {
g_print("usage: %s <filename>\n", argv[0]);
exit(-1);
}
gst_init(&argc,&argv);
/* create a new thread to hold the elements */
thread = gst_thread_new("thread");
g_assert(thread != NULL);
/* create a new bin to hold the elements */
bin = gst_bin_new("bin");
g_assert(bin != NULL);
/* create a disk reader */
disksrc = gst_elementfactory_make("disksrc", "disk_source");
g_assert(disksrc != NULL);
gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
GTK_SIGNAL_FUNC(eos), thread);
parse = gst_elementfactory_make("mp3parse", "parse");
decode = gst_elementfactory_make("mpg123", "decode");
queue = gst_elementfactory_make("queue", "queue");
/* and an audio sink */
audiosink = gst_elementfactory_make("audiosink", "play_audio");
g_assert(audiosink != NULL);
/* add objects to the main pipeline */
gst_bin_add(GST_BIN(bin), disksrc);
gst_bin_add(GST_BIN(bin), parse);
gst_bin_add(GST_BIN(bin), decode);
gst_bin_add(GST_BIN(bin), queue);
gst_bin_add(GST_BIN(thread), audiosink);
gst_bin_add(GST_BIN(bin), thread);
gst_pad_connect(gst_element_get_pad(disksrc,"src"),
gst_element_get_pad(parse,"sink"));
gst_pad_connect(gst_element_get_pad(parse,"src"),
gst_element_get_pad(decode,"sink"));
gst_pad_connect(gst_element_get_pad(decode,"src"),
gst_element_get_pad(queue,"sink"));
gst_pad_connect(gst_element_get_pad(queue,"src"),
gst_element_get_pad(audiosink,"sink"));
/* make it ready */
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY);
/* start playing */
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING);
playing = TRUE;
while (playing) {
gst_bin_iterate(GST_BIN(bin));
}
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL);
exit(0);
}

2
examples/queue2/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
Makefile
queue2

81
examples/queue2/queue2.c Normal file
View file

@ -0,0 +1,81 @@
#include <gst/gst.h>
gboolean playing;
/* eos will be called when the src element has an end of stream */
void eos(GstSrc *src, gpointer data)
{
g_print("have eos, quitting\n");
playing = FALSE;
}
int main(int argc,char *argv[])
{
GstElement *disksrc, *audiosink, *queue;
GstElement *pipeline;
GstElement *thread;
if (argc != 2) {
g_print("usage: %s <filename>\n", argv[0]);
exit(-1);
}
gst_init(&argc,&argv);
/* create a new thread to hold the elements */
thread = gst_thread_new("thread");
g_assert(thread != NULL);
/* create a new bin to hold the elements */
pipeline = gst_pipeline_new("pipeline");
g_assert(pipeline != NULL);
/* create a disk reader */
disksrc = gst_elementfactory_make("disksrc", "disk_source");
g_assert(disksrc != NULL);
gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
GTK_SIGNAL_FUNC(eos), thread);
queue = gst_elementfactory_make("queue", "queue");
/* and an audio sink */
audiosink = gst_elementfactory_make("audiosink", "play_audio");
g_assert(audiosink != NULL);
/* add objects to the main pipeline */
gst_pipeline_add_src(GST_PIPELINE(pipeline), disksrc);
gst_pipeline_add_sink(GST_PIPELINE(pipeline), queue);
gst_bin_add(GST_BIN(thread), audiosink);
gst_pad_connect(gst_element_get_pad(queue,"src"),
gst_element_get_pad(audiosink,"sink"));
gst_pad_set_type_id(gst_element_get_pad(queue, "sink"),
gst_pad_get_type_id(gst_element_get_pad(audiosink, "sink")));
if (!gst_pipeline_autoplug(GST_PIPELINE(pipeline))) {
g_print("cannot autoplug pipeline\n");
exit(-1);
}
gst_bin_add(GST_BIN(pipeline), thread);
/* make it ready */
gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY);
/* start playing */
gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING);
playing = TRUE;
while (playing) {
gst_bin_iterate(GST_BIN(pipeline));
}
gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_NULL);
exit(0);
}

2
examples/queue3/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
Makefile
queue3

85
examples/queue3/queue3.c Normal file
View file

@ -0,0 +1,85 @@
#include <gst/gst.h>
gboolean playing;
/* eos will be called when the src element has an end of stream */
void eos(GstSrc *src, gpointer data)
{
g_print("have eos, quitting\n");
playing = FALSE;
}
int main(int argc,char *argv[])
{
GstElement *disksrc, *audiosink, *queue, *parse, *decode;
GstElement *bin;
GstElement *thread;
if (argc != 2) {
g_print("usage: %s <filename>\n", argv[0]);
exit(-1);
}
gst_init(&argc,&argv);
/* create a new thread to hold the elements */
thread = gst_thread_new("thread");
g_assert(thread != NULL);
/* create a new bin to hold the elements */
bin = gst_bin_new("bin");
g_assert(bin != NULL);
/* create a disk reader */
disksrc = gst_elementfactory_make("disksrc", "disk_source");
g_assert(disksrc != NULL);
gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
GTK_SIGNAL_FUNC(eos), thread);
queue = gst_elementfactory_make("queue", "queue");
/* and an audio sink */
audiosink = gst_elementfactory_make("audiosink", "play_audio");
g_assert(audiosink != NULL);
parse = gst_elementfactory_make("mp3parse", "parse");
decode = gst_elementfactory_make("mpg123", "decode");
/* add objects to the main bin */
gst_bin_add(GST_BIN(bin), disksrc);
gst_bin_add(GST_BIN(bin), queue);
gst_bin_add(GST_BIN(thread), parse);
gst_bin_add(GST_BIN(thread), decode);
gst_bin_add(GST_BIN(thread), audiosink);
gst_pad_connect(gst_element_get_pad(disksrc,"src"),
gst_element_get_pad(queue,"sink"));
gst_pad_connect(gst_element_get_pad(queue,"src"),
gst_element_get_pad(parse,"sink"));
gst_pad_connect(gst_element_get_pad(parse,"src"),
gst_element_get_pad(decode,"sink"));
gst_pad_connect(gst_element_get_pad(decode,"src"),
gst_element_get_pad(audiosink,"sink"));
gst_bin_add(GST_BIN(bin), thread);
/* make it ready */
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY);
/* start playing */
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING);
playing = TRUE;
while (playing) {
gst_bin_iterate(GST_BIN(bin));
}
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL);
exit(0);
}

View file

@ -53,9 +53,9 @@ int main(int argc,char *argv[])
exit(-1); exit(-1);
} }
gst_bin_remove(GST_BIN(pipeline), disksrc); //gst_bin_remove(GST_BIN(pipeline), disksrc);
gst_bin_add(GST_BIN(thread), disksrc); //gst_bin_add(GST_BIN(thread), disksrc);
gst_bin_add(GST_BIN(thread), GST_ELEMENT(pipeline)); gst_bin_add(GST_BIN(thread), GST_ELEMENT(pipeline));
/* make it ready */ /* make it ready */

View file

@ -41,6 +41,7 @@ cothread_state *cothread_create(cothread_context *ctx) {
s->threadnum = ctx->nthreads; s->threadnum = ctx->nthreads;
s->flags = 0; s->flags = 0;
s->sp = ((int *)s + COTHREAD_STACKSIZE); s->sp = ((int *)s + COTHREAD_STACKSIZE);
s->top_sp = s->sp;
ctx->threads[ctx->nthreads++] = s; ctx->threads[ctx->nthreads++] = s;
@ -103,6 +104,7 @@ void cothread_stub() {
thread->func(thread->argc,thread->argv); thread->func(thread->argc,thread->argv);
thread->flags &= ~COTHREAD_STARTED; thread->flags &= ~COTHREAD_STARTED;
thread->pc = 0; thread->pc = 0;
thread->sp = thread->top_sp;
DEBUG("cothread: cothread_stub() exit\n"); DEBUG("cothread: cothread_stub() exit\n");
//printf("uh, yeah, we shouldn't be here, but we should deal anyway\n"); //printf("uh, yeah, we shouldn't be here, but we should deal anyway\n");
} }
@ -113,8 +115,10 @@ void cothread_switch(cothread_state *thread) {
int enter; int enter;
// int i; // int i;
if (thread == NULL) if (thread == NULL) {
g_print("cothread: there's no thread, strange...\n");
return; return;
}
ctx = thread->ctx; ctx = thread->ctx;
@ -124,12 +128,10 @@ void cothread_switch(cothread_state *thread) {
exit(2); exit(2);
} }
/*
if (current == thread) { if (current == thread) {
g_print("cothread: trying to switch to same thread, legal but not necessary\n"); g_print("cothread: trying to switch to same thread, legal but not necessary\n");
//return; return;
} }
*/
// find the number of the thread to switch to // find the number of the thread to switch to
ctx->current = thread->threadnum; ctx->current = thread->threadnum;
@ -137,11 +139,14 @@ void cothread_switch(cothread_state *thread) {
/* save the current stack pointer, frame pointer, and pc */ /* save the current stack pointer, frame pointer, and pc */
GET_SP(current->sp); GET_SP(current->sp);
enter = setjmp(current->jmp); enter = sigsetjmp(current->jmp, 1);
DEBUG("cothread: after thread #%d %d\n",ctx->current, enter);
if (enter != 0) { if (enter != 0) {
DEBUG("cothread: enter thread #%d %d %p<->%p (%d)\n",current->threadnum, enter,
current->sp, current->top_sp, current->top_sp-current->sp);
return; return;
} }
DEBUG("cothread: exit thread #%d %d %p<->%p (%d)\n",current->threadnum, enter,
current->sp, current->top_sp, current->top_sp-current->sp);
enter = 1; enter = 1;
DEBUG("cothread: set stack to %p\n", thread->sp); DEBUG("cothread: set stack to %p\n", thread->sp);
@ -150,7 +155,7 @@ void cothread_switch(cothread_state *thread) {
DEBUG("cothread: in thread \n"); DEBUG("cothread: in thread \n");
SET_SP(thread->sp); SET_SP(thread->sp);
// switch to it // switch to it
longjmp(thread->jmp,1); siglongjmp(thread->jmp,1);
} else { } else {
SETUP_STACK(thread->sp); SETUP_STACK(thread->sp);
SET_SP(thread->sp); SET_SP(thread->sp);
@ -158,5 +163,6 @@ void cothread_switch(cothread_state *thread) {
//JUMP(cothread_stub); //JUMP(cothread_stub);
cothread_stub(); cothread_stub();
DEBUG("cothread: exit thread \n"); DEBUG("cothread: exit thread \n");
ctx->current = 0;
} }
} }

View file

@ -29,8 +29,9 @@ struct _cothread_state {
int flags; int flags;
int *sp; int *sp;
int *top_sp;
int *pc; int *pc;
jmp_buf jmp; sigjmp_buf jmp;
}; };
struct _cothread_context { struct _cothread_context {

View file

@ -27,6 +27,7 @@
#include <gstqueue.h> #include <gstqueue.h>
#include <gst/gstarch.h>
GstElementDetails gst_queue_details = { GstElementDetails gst_queue_details = {
"Queue", "Queue",
@ -105,6 +106,7 @@ static void gst_queue_class_init(GstQueueClass *klass) {
static void gst_queue_init(GstQueue *queue) { static void gst_queue_init(GstQueue *queue) {
queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK); queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK);
gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad); gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad);
gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain); gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain);
queue->srcpad = gst_pad_new("src",GST_PAD_SRC); queue->srcpad = gst_pad_new("src",GST_PAD_SRC);
gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad); gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad);
@ -150,9 +152,10 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
/* we have to lock the queue since we span threads */ /* we have to lock the queue since we span threads */
DEBUG("queue: %s adding buffer %p\n", name, buf); DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self());
GST_LOCK(queue); GST_LOCK(queue);
DEBUG("queue: have queue lock\n");
if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) { if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) {
g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name); g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name);
@ -164,18 +167,17 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
if (queue->level_buffers >= queue->max_buffers) {
DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
while (queue->level_buffers >= queue->max_buffers) {
GST_UNLOCK(queue);
g_mutex_lock(queue->fulllock); g_mutex_lock(queue->fulllock);
while (queue->level_buffers >= queue->max_buffers) {
DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
STATUS("%s: O\n"); STATUS("%s: O\n");
GST_UNLOCK(queue);
g_cond_wait(queue->fullcond,queue->fulllock); g_cond_wait(queue->fullcond,queue->fulllock);
g_mutex_unlock(queue->fulllock);
GST_LOCK(queue); GST_LOCK(queue);
} STATUS("%s: O+\n");
DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
} }
g_mutex_unlock(queue->fulllock);
/* put the buffer on the head of the list */ /* put the buffer on the head of the list */
@ -192,18 +194,19 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
STATUS("%s: +\n"); STATUS("%s: +\n");
/* if we were empty, but aren't any more, signal a condition */ /* if we were empty, but aren't any more, signal a condition */
tosignal = (queue->level_buffers <= 0); tosignal = (queue->level_buffers >= 0);
queue->level_buffers++; queue->level_buffers++;
/* we can unlock now */ /* we can unlock now */
DEBUG("queue: %s chain %d end\n", name, queue->level_buffers); DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
GST_UNLOCK(queue); GST_UNLOCK(queue);
if (tosignal) { if (tosignal) {
g_mutex_lock(queue->emptylock); g_mutex_lock(queue->emptylock);
STATUS("%s: >\n");
g_cond_signal(queue->emptycond); g_cond_signal(queue->emptycond);
STATUS("%s: >>\n");
g_mutex_unlock(queue->emptylock); g_mutex_unlock(queue->emptylock);
//g_print(">");
} }
} }
@ -216,17 +219,20 @@ void gst_queue_push(GstConnection *connection) {
name = gst_element_get_name(GST_ELEMENT(queue)); name = gst_element_get_name(GST_ELEMENT(queue));
DEBUG("queue: %s push %d\n", name, queue->level_buffers); DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond);
/* have to lock for thread-safety */ /* have to lock for thread-safety */
DEBUG("queue: try have queue lock\n");
GST_LOCK(queue); GST_LOCK(queue);
DEBUG("queue: have queue lock\n");
while (!queue->level_buffers) { while (!queue->level_buffers) {
STATUS("%s: U released lock\n");
GST_UNLOCK(queue); GST_UNLOCK(queue);
g_mutex_lock(queue->emptylock); g_mutex_lock(queue->emptylock);
STATUS("%s: U\n");
g_cond_wait(queue->emptycond,queue->emptylock); g_cond_wait(queue->emptycond,queue->emptylock);
g_mutex_unlock(queue->emptylock); g_mutex_unlock(queue->emptylock);
GST_LOCK(queue); GST_LOCK(queue);
STATUS("%s: U- getting lock\n");
} }
front = queue->queue; front = queue->queue;
@ -240,13 +246,15 @@ void gst_queue_push(GstConnection *connection) {
if (tosignal) { if (tosignal) {
g_mutex_lock(queue->fulllock); g_mutex_lock(queue->fulllock);
STATUS("%s: < \n");
g_cond_signal(queue->fullcond); g_cond_signal(queue->fullcond);
STATUS("%s: << \n");
g_mutex_unlock(queue->fulllock); g_mutex_unlock(queue->fulllock);
} }
DEBUG("queue: %s pushing %d %p\n", name, queue->level_buffers, buf); //DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf);
gst_pad_push(queue->srcpad,buf); gst_pad_push(queue->srcpad,buf);
DEBUG("queue: %s pushing %d done\n", name, queue->level_buffers); //DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers);
/* unlock now */ /* unlock now */
} }

View file

@ -202,6 +202,9 @@ static GstElementStateReturn gst_bin_change_state(GstElement *element) {
_gst_print_statename(GST_STATE(element)),GST_STATE_PENDING(element), _gst_print_statename(GST_STATE(element)),GST_STATE_PENDING(element),
_gst_print_statename(GST_STATE_PENDING(element))); _gst_print_statename(GST_STATE_PENDING(element)));
if (GST_STATE_PENDING(element) == GST_STATE_READY) {
gst_bin_create_plan(bin);
}
// g_return_val_if_fail(bin->numchildren != 0, GST_STATE_FAILURE); // g_return_val_if_fail(bin->numchildren != 0, GST_STATE_FAILURE);
// g_print("-->\n"); // g_print("-->\n");
@ -226,18 +229,12 @@ static GstElementStateReturn gst_bin_change_state(GstElement *element) {
} }
// g_print("<-- \"%s\"\n",gst_object_get_name(GST_OBJECT(bin))); // g_print("<-- \"%s\"\n",gst_object_get_name(GST_OBJECT(bin)));
// if (GST_STATE_PENDING(element),
return gst_bin_change_state_norecurse(bin); return gst_bin_change_state_norecurse(bin);
} }
static GstElementStateReturn gst_bin_change_state_norecurse(GstBin *bin) { static GstElementStateReturn gst_bin_change_state_norecurse(GstBin *bin) {
/*
if ((state == GST_STATE_READY) && (GST_STATE(bin) < GST_STATE_READY)) {
// gst_bin_create_plan(
}
*/
if (GST_ELEMENT_CLASS(parent_class)->change_state) if (GST_ELEMENT_CLASS(parent_class)->change_state)
return GST_ELEMENT_CLASS(parent_class)->change_state(GST_ELEMENT(bin)); return GST_ELEMENT_CLASS(parent_class)->change_state(GST_ELEMENT(bin));
@ -292,8 +289,8 @@ gboolean gst_bin_set_state_type(GstBin *bin,
GtkType type) { GtkType type) {
GstBinClass *oclass; GstBinClass *oclass;
// g_print("gst_bin_set_state_type(\"%s\",%d,%d)\n", DEBUG("gst_bin_set_state_type(\"%s\",%d,%d)\n",
// gst_object_get_name(GST_OBJECT(bin)),state,type); gst_element_get_name(GST_ELEMENT(bin)),state,type);
g_return_val_if_fail(bin != NULL, FALSE); g_return_val_if_fail(bin != NULL, FALSE);
g_return_val_if_fail(GST_IS_BIN(bin), FALSE); g_return_val_if_fail(GST_IS_BIN(bin), FALSE);
@ -310,7 +307,7 @@ void gst_bin_real_destroy(GtkObject *object) {
GList *children; GList *children;
GstElement *child; GstElement *child;
// g_print("in gst_bin_real_destroy()\n"); DEBUG("in gst_bin_real_destroy()\n");
children = bin->children; children = bin->children;
while (children) { while (children) {
@ -427,32 +424,38 @@ static int gst_bin_loopfunc_wrapper(int argc,char *argv[]) {
GList *pads; GList *pads;
GstPad *pad; GstPad *pad;
GstBuffer *buf; GstBuffer *buf;
gchar *name = gst_element_get_name(element);
// g_print("** gst_bin_loopfunc_wrapper(%d,\"%s\")\n", DEBUG("** gst_bin_loopfunc_wrapper(%d,\"%s\")\n",
// argc,gst_element_get_name(element)); argc,gst_element_get_name(element));
if (element->loopfunc != NULL) { if (element->loopfunc != NULL) {
while (1) { while (1) {
DEBUG("** gst_bin_loopfunc_wrapper(): element has loop function, calling it\n"); DEBUG("** gst_bin_loopfunc_wrapper(): element %s has loop function, calling it\n", name);
(element->loopfunc)(element); (element->loopfunc)(element);
DEBUG("** gst_bin_loopfunc_wrapper(): element ended loop function\n"); DEBUG("** gst_bin_loopfunc_wrapper(): element %s ended loop function\n", name);
} }
} else { } else {
DEBUG("** gst_bin_loopfunc_wrapper(): element is chain-based, calling in infinite loop\n"); DEBUG("** gst_bin_loopfunc_wrapper(): element %s is chain-based, calling in infinite loop\n", name);
if (GST_IS_SRC(element)) { if (GST_IS_SRC(element)) {
//while (1) { DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of source %s\n", name);
DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of source\n");
gst_src_push(GST_SRC(element)); gst_src_push(GST_SRC(element));
//} DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of source %s done\n", name);
} else if (GST_IS_CONNECTION(element) && argc == 1) {
while (1) {
DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of connection %s\n", name);
gst_connection_push(GST_CONNECTION(element));
DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of connection %s done\n", name);
}
} else { } else {
while (1) { while (1) {
pads = element->pads; pads = element->pads;
while (pads) { while (pads) {
pad = GST_PAD(pads->data); pad = GST_PAD(pads->data);
if (pad->direction == GST_PAD_SINK) { if (pad->direction == GST_PAD_SINK) {
DEBUG("** gst_bin_loopfunc_wrapper(): pulling a buffer\n"); DEBUG("** gst_bin_loopfunc_wrapper(): pulling a buffer from %s:%s\n", name, gst_pad_get_name(pad));
buf = gst_pad_pull(pad); buf = gst_pad_pull(pad);
DEBUG("** gst_bin_loopfunc_wrapper(): calling chain function\n"); DEBUG("** gst_bin_loopfunc_wrapper(): calling chain function of %s:%s\n", name, gst_pad_get_name(pad));
(pad->chainfunc)(pad,buf); (pad->chainfunc)(pad,buf);
} }
pads = g_list_next(pads); pads = g_list_next(pads);
@ -484,7 +487,7 @@ static void gst_bin_create_plan_func(GstBin *bin) {
GstElement *element; GstElement *element;
int sink_pads; int sink_pads;
GList *pads; GList *pads;
GstPad *pad, *peer; GstPad *pad, *opad, *peer;
GstElement *outside; GstElement *outside;
g_print("gstbin: creating plan for bin \"%s\"\n", gst_element_get_name(GST_ELEMENT(bin))); g_print("gstbin: creating plan for bin \"%s\"\n", gst_element_get_name(GST_ELEMENT(bin)));
@ -499,13 +502,11 @@ static void gst_bin_create_plan_func(GstBin *bin) {
if (element->loopfunc != NULL) { if (element->loopfunc != NULL) {
g_print("gstbin: loop based element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin))); g_print("gstbin: loop based element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin)));
bin->need_cothreads = TRUE; bin->need_cothreads = TRUE;
break;
} }
// if it's a complex element, use cothreads // if it's a complex element, use cothreads
if (GST_ELEMENT_IS_MULTI_IN(element)) { else if (GST_ELEMENT_IS_MULTI_IN(element)) {
g_print("gstbin: complex element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin))); g_print("gstbin: complex element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin)));
bin->need_cothreads = TRUE; bin->need_cothreads = TRUE;
break;
} }
// if it has more than one input pad, use cothreads // if it has more than one input pad, use cothreads
sink_pads = 0; sink_pads = 0;
@ -527,6 +528,11 @@ static void gst_bin_create_plan_func(GstBin *bin) {
// FIXME // FIXME
bin->need_cothreads &= bin->use_cothreads; bin->need_cothreads &= bin->use_cothreads;
// clear previous plan state
g_list_free(bin->entries);
bin->entries = NULL;
bin->numentries = 0;
if (bin->need_cothreads) { if (bin->need_cothreads) {
g_print("gstbin: need cothreads\n"); g_print("gstbin: need cothreads\n");
@ -553,20 +559,54 @@ static void gst_bin_create_plan_func(GstBin *bin) {
pad = GST_PAD(pads->data); pad = GST_PAD(pads->data);
g_print("gstbin: setting push&pull handlers for %s:%s\n", g_print("gstbin: setting push&pull handlers for %s:%s\n",
gst_element_get_name(element),gst_pad_get_name(pad)); gst_element_get_name(element),gst_pad_get_name(pad));
// if (pad->direction == GST_PAD_SRC)
// an internal connection will push outside this bin.
if (!GST_IS_CONNECTION(element)) {
pad->pushfunc = gst_bin_pushfunc_wrapper; pad->pushfunc = gst_bin_pushfunc_wrapper;
// else }
pad->pullfunc = gst_bin_pullfunc_wrapper; pad->pullfunc = gst_bin_pullfunc_wrapper;
/* we only worry about sink pads */
if (gst_pad_get_direction(pad) == GST_PAD_SINK) {
/* get the pad's peer */
peer = gst_pad_get_peer(pad);
if (!peer) break;
/* get the parent of the peer of the pad */
outside = GST_ELEMENT(gst_pad_get_parent(peer));
if (!outside) break;
/* if it's a connection and it's not ours... */
if (GST_IS_CONNECTION(outside) &&
(gst_object_get_parent(GST_OBJECT(outside)) != GST_OBJECT(bin))) {
GList *connection_pads = gst_element_get_pad_list(outside);
while (connection_pads) {
opad = GST_PAD(connection_pads->data);
if (gst_pad_get_direction(opad) == GST_PAD_SRC) {
g_print("gstbin: setting push&pull handlers for %s:%s SRC connection\n",
gst_element_get_name(outside),gst_pad_get_name(opad));
opad->pushfunc = gst_bin_pushfunc_wrapper;
opad->pullfunc = gst_bin_pullfunc_wrapper;
if (outside->threadstate == NULL) {
outside->threadstate = cothread_create(bin->threadcontext);
cothread_setfunc(outside->threadstate,gst_bin_loopfunc_wrapper,
1,(char **)outside);
}
}
connection_pads = g_list_next(connection_pads);
}
gst_info("gstbin: element \"%s\" is the external source Connection "
"for internal element \"%s\"\n",
gst_element_get_name(GST_ELEMENT(outside)),
gst_element_get_name(GST_ELEMENT(element)));
bin->entries = g_list_prepend(bin->entries,outside);
bin->numentries++;
}
}
pads = g_list_next(pads); pads = g_list_next(pads);
} }
elements = g_list_next(elements); elements = g_list_next(elements);
} }
} else { } else {
g_print("gstbin: don't need cothreads, looking for entry points\n"); g_print("gstbin: don't need cothreads, looking for entry points\n");
// clear previous plan state
g_list_free(bin->entries);
bin->entries = NULL;
bin->numentries = 0;
// we have to find which elements will drive an iteration // we have to find which elements will drive an iteration
elements = bin->children; elements = bin->children;
while (elements) { while (elements) {
@ -592,8 +632,8 @@ gst_element_get_name(element),gst_pad_get_name(pad));
/* if it's a connection and it's not ours... */ /* if it's a connection and it's not ours... */
if (GST_IS_CONNECTION(outside) && if (GST_IS_CONNECTION(outside) &&
(gst_object_get_parent(GST_OBJECT(outside)) != GST_OBJECT(bin))) { (gst_object_get_parent(GST_OBJECT(outside)) != GST_OBJECT(bin))) {
gst_info("gstbin: element \"%s\" is the external source Connection \ gst_info("gstbin: element \"%s\" is the external source Connection "
for internal element \"%s\"\n", "for internal element \"%s\"\n",
gst_element_get_name(GST_ELEMENT(outside)), gst_element_get_name(GST_ELEMENT(outside)),
gst_element_get_name(GST_ELEMENT(element))); gst_element_get_name(GST_ELEMENT(element)));
bin->entries = g_list_prepend(bin->entries,outside); bin->entries = g_list_prepend(bin->entries,outside);
@ -617,7 +657,6 @@ void gst_bin_iterate_func(GstBin *bin) {
g_return_if_fail(bin != NULL); g_return_if_fail(bin != NULL);
g_return_if_fail(GST_IS_BIN(bin)); g_return_if_fail(GST_IS_BIN(bin));
g_return_if_fail(GST_STATE(bin) == GST_STATE_PLAYING); g_return_if_fail(GST_STATE(bin) == GST_STATE_PLAYING);
g_return_if_fail(bin->numentries > 0);
DEBUG("GstBin: iterating\n"); DEBUG("GstBin: iterating\n");
@ -628,7 +667,13 @@ void gst_bin_iterate_func(GstBin *bin) {
gst_element_get_name(GST_ELEMENT(bin->children->data))); gst_element_get_name(GST_ELEMENT(bin->children->data)));
cothread_switch(GST_ELEMENT(bin->children->data)->threadstate); cothread_switch(GST_ELEMENT(bin->children->data)->threadstate);
} else { } else {
if (bin->numentries <= 0) {
printf("gstbin: no elements in bin \"%s\"\n", gst_element_get_name(GST_ELEMENT(bin)));
entries = bin->children;
}
else {
entries = bin->entries; entries = bin->entries;
}
while (entries) { while (entries) {
entry = GST_ELEMENT(entries->data); entry = GST_ELEMENT(entries->data);
@ -636,6 +681,8 @@ void gst_bin_iterate_func(GstBin *bin) {
gst_src_push(GST_SRC(entry)); gst_src_push(GST_SRC(entry));
else if (GST_IS_CONNECTION(entry)) else if (GST_IS_CONNECTION(entry))
gst_connection_push(GST_CONNECTION(entry)); gst_connection_push(GST_CONNECTION(entry));
else if (GST_IS_BIN(entry))
gst_bin_iterate(GST_BIN(entry));
else else
g_assert_not_reached(); g_assert_not_reached();
entries = g_list_next(entries); entries = g_list_next(entries);

View file

@ -183,16 +183,11 @@ void gst_pad_set_qos_function(GstPad *pad,GstPadQoSFunction qos) {
void gst_pad_push(GstPad *pad,GstBuffer *buffer) { void gst_pad_push(GstPad *pad,GstBuffer *buffer) {
g_return_if_fail(pad != NULL); g_return_if_fail(pad != NULL);
g_return_if_fail(GST_IS_PAD(pad)); g_return_if_fail(GST_IS_PAD(pad));
// g_return_if_fail(GST_PAD_CONNECTED(pad)); g_return_if_fail(GST_PAD_CONNECTED(pad));
g_return_if_fail(buffer != NULL); g_return_if_fail(buffer != NULL);
gst_trace_add_entry(NULL,0,buffer,"push buffer"); gst_trace_add_entry(NULL,0,buffer,"push buffer");
// FIXME we should probably make some noise here...
if (!GST_PAD_CONNECTED(pad)) return;
// g_return_if_fail(pad->pushfunc != NULL);
// first check to see if there's a push handler // first check to see if there's a push handler
if (pad->pushfunc != NULL) { if (pad->pushfunc != NULL) {
//g_print("-- gst_pad_push(): putting buffer in pen and calling push handler\n"); //g_print("-- gst_pad_push(): putting buffer in pen and calling push handler\n");
@ -230,7 +225,7 @@ GstBuffer *gst_pad_pull(GstPad *pad) {
// g_print("-- gst_pad_pull(): calling pull handler\n"); // g_print("-- gst_pad_pull(): calling pull handler\n");
(pad->pullfunc)(pad->peer); (pad->pullfunc)(pad->peer);
} else { } else {
// g_print("-- gst_pad_pull(): no buffer in pen, and no handler to get one there!!!\n"); g_print("-- gst_pad_pull(): no buffer in pen, and no handler to get one there!!!\n");
} }
} }
@ -242,7 +237,7 @@ GstBuffer *gst_pad_pull(GstPad *pad) {
return buf; return buf;
// else we have a big problem... // else we have a big problem...
} else { } else {
// g_print("-- gst_pad_pull(): uh, nothing in pen and no handler\n"); g_print("-- gst_pad_pull(): uh, nothing in pen and no handler\n");
return NULL; return NULL;
} }

View file

@ -347,10 +347,27 @@ gboolean gst_pipeline_autoplug(GstPipeline *pipeline) {
elements = pipeline->sinks; elements = pipeline->sinks;
// fase 2, find all the sinks.. // fase 2, loop over all the sinks..
while (elements) { while (elements) {
GList *pads;
GstPad *pad;
element = GST_ELEMENT(elements->data); element = GST_ELEMENT(elements->data);
pads = gst_element_get_pad_list(element);
while (pads) {
pad = (GstPad *)pads->data;
if (pad->direction == GST_PAD_SINK) {
sink_type = gst_pad_get_type_id(pad);
sinkelement = element;
break;
}
pads = g_list_next(pads);
}
/*
if (GST_IS_SINK(element)) { if (GST_IS_SINK(element)) {
g_print("GstPipeline: found sink \"%s\"\n", gst_element_get_name(element)); g_print("GstPipeline: found sink \"%s\"\n", gst_element_get_name(element));
@ -369,6 +386,7 @@ gboolean gst_pipeline_autoplug(GstPipeline *pipeline) {
gst_element_get_name(element), sink_type); gst_element_get_name(element), sink_type);
} }
} }
*/
elements = g_list_next(elements); elements = g_list_next(elements);
} }

View file

@ -105,9 +105,6 @@ gst_thread_class_init(GstThreadClass *klass) {
static void gst_thread_init(GstThread *thread) { static void gst_thread_init(GstThread *thread) {
GST_FLAG_SET(thread,GST_THREAD_CREATE); GST_FLAG_SET(thread,GST_THREAD_CREATE);
// thread->entries = NULL;
// thread->numentries = 0;
thread->lock = g_mutex_new(); thread->lock = g_mutex_new();
thread->cond = g_cond_new(); thread->cond = g_cond_new();
} }
@ -178,10 +175,13 @@ static GstElementStateReturn gst_thread_change_state(GstElement *element) {
pending = GST_STATE_PENDING(element); pending = GST_STATE_PENDING(element);
if (pending == GST_STATE(element)) return GST_STATE_SUCCESS;
if (GST_ELEMENT_CLASS(parent_class)->change_state) if (GST_ELEMENT_CLASS(parent_class)->change_state)
stateset = GST_ELEMENT_CLASS(parent_class)->change_state(element); stateset = GST_ELEMENT_CLASS(parent_class)->change_state(element);
gst_info("gstthread: stateset %d %d\n", stateset, GST_STATE_PENDING(element)); gst_info("gstthread: stateset %d %d %d\n", GST_STATE(element), stateset, GST_STATE_PENDING(element));
switch (pending) { switch (pending) {
case GST_STATE_READY: case GST_STATE_READY:
@ -189,13 +189,10 @@ static GstElementStateReturn gst_thread_change_state(GstElement *element) {
// we want to prepare our internal state for doing the iterations // we want to prepare our internal state for doing the iterations
gst_info("gstthread: preparing thread \"%s\" for iterations:\n", gst_info("gstthread: preparing thread \"%s\" for iterations:\n",
gst_element_get_name(GST_ELEMENT(element))); gst_element_get_name(GST_ELEMENT(element)));
//gst_thread_prepare(thread);
gst_bin_create_plan(GST_BIN(thread));
// if (thread->numentries == 0)
// return FALSE;
// set the state to idle // set the state to idle
GST_FLAG_UNSET(thread,GST_THREAD_STATE_SPINNING); GST_FLAG_UNSET(thread,GST_THREAD_STATE_SPINNING);
GST_FLAG_UNSET(thread,GST_THREAD_STATE_REAPING);
// create the thread if that's what we're supposed to do // create the thread if that's what we're supposed to do
gst_info("gstthread: flags are 0x%08x\n",GST_FLAGS(thread)); gst_info("gstthread: flags are 0x%08x\n",GST_FLAGS(thread));
if (GST_FLAG_IS_SET(thread,GST_THREAD_CREATE)) { if (GST_FLAG_IS_SET(thread,GST_THREAD_CREATE)) {
@ -214,12 +211,14 @@ static GstElementStateReturn gst_thread_change_state(GstElement *element) {
gst_info("gstthread: starting thread \"%s\"\n", gst_info("gstthread: starting thread \"%s\"\n",
gst_element_get_name(GST_ELEMENT(element))); gst_element_get_name(GST_ELEMENT(element)));
GST_FLAG_SET(thread,GST_THREAD_STATE_SPINNING); GST_FLAG_SET(thread,GST_THREAD_STATE_SPINNING);
GST_FLAG_UNSET(thread,GST_THREAD_STATE_REAPING);
gst_thread_signal_thread(thread); gst_thread_signal_thread(thread);
break; break;
case GST_STATE_PAUSED: case GST_STATE_PAUSED:
gst_info("gstthread: pausing thread \"%s\"\n", gst_info("gstthread: pausing thread \"%s\"\n",
gst_element_get_name(GST_ELEMENT(element))); gst_element_get_name(GST_ELEMENT(element)));
GST_FLAG_UNSET(thread,GST_THREAD_STATE_SPINNING); GST_FLAG_UNSET(thread,GST_THREAD_STATE_SPINNING);
GST_FLAG_UNSET(thread,GST_THREAD_STATE_REAPING);
gst_thread_signal_thread(thread); gst_thread_signal_thread(thread);
break; break;
case GST_STATE_NULL: case GST_STATE_NULL:

View file

@ -1,7 +1,7 @@
## Process this file with automake to produce Makefile.in ## Process this file with automake to produce Makefile.in
INCLUDES = $(GLIB_CFLAGS) $(GTK_CFLAGS) -I$(top_srcdir) \ INCLUDES = $(GLIB_CFLAGS) $(GTK_CFLAGS) -I$(top_srcdir) \
$(shell gnome-config --cflags gnomeui) $(shell gnome-config --cflags gnomeui) $(shell gstreamer-config --cflags)
bin_PROGRAMS = gstplay bin_PROGRAMS = gstplay
@ -19,8 +19,10 @@ noinst_HEADERS = codecs.h
CFLAGS += -O2 -Wall -DDATADIR=\""$(gladedir)/"\" CFLAGS += -O2 -Wall -DDATADIR=\""$(gladedir)/"\"
gstplay_CFLAGS = $(shell gnome-config --cflags gnomeui) $(shell libglade-config --cflags gnome) gstplay_CFLAGS = $(shell gnome-config --cflags gnomeui) $(shell libglade-config --cflags gnome) \
gstplay_LDFLAGS = $(shell gnome-config --libs gnomeui) $(shell libglade-config --libs gnome) $(shell gstreamer-config --cflags )
gstplay_LDFLAGS = $(shell gnome-config --libs gnomeui) $(shell libglade-config --libs gnome) \
$(shell gstreamer-config --libs )
if HAVE_LIBXV if HAVE_LIBXV
xvlibs=-lXv xvlibs=-lXv

View file

@ -17,12 +17,14 @@ void avi_new_pad_created(GstElement *parse,GstPad *pad,GstElement *pipeline)
//if (0) { //if (0) {
if (strncmp(gst_pad_get_name(pad), "audio_", 6) == 0) { if (strncmp(gst_pad_get_name(pad), "audio_", 6) == 0) {
gst_bin_add(GST_BIN(pipeline), audio_render_queue);
gst_pad_connect(pad, gst_pad_connect(pad,
gst_element_get_pad(audio_render_queue,"sink")); gst_element_get_pad(audio_render_queue,"sink"));
} else if (strncmp(gst_pad_get_name(pad), "video_", 6) == 0) { } else if (strncmp(gst_pad_get_name(pad), "video_", 6) == 0) {
//} else if (0) { //} else if (0) {
gst_bin_add(GST_BIN(pipeline), video_render_queue);
gst_pad_connect(pad, gst_pad_connect(pad,
gst_element_get_pad(video_render_queue,"sink")); gst_element_get_pad(video_render_queue,"sink"));
} }

View file

@ -27,7 +27,7 @@ gboolean idle_func(gpointer data);
GstElement *show, *video_render_queue; GstElement *show, *video_render_queue;
GstElement *audio_play, *audio_render_queue; GstElement *audio_play, *audio_render_queue;
GstElement *src; GstElement *src;
GstPipeline *pipeline; GstElement *pipeline;
GstElement *parse = NULL; GstElement *parse = NULL;
GstElement *typefind; GstElement *typefind;
GstElement *video_render_thread; GstElement *video_render_thread;
@ -254,6 +254,9 @@ static void have_type(GstSink *sink) {
} }
gtk_object_set(GTK_OBJECT(src),"offset",0,NULL); gtk_object_set(GTK_OBJECT(src),"offset",0,NULL);
gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(video_render_thread));
gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(audio_render_thread));
g_print("setting to READY state\n"); g_print("setting to READY state\n");
gst_element_set_state(GST_ELEMENT(pipeline),GST_STATE_READY); gst_element_set_state(GST_ELEMENT(pipeline),GST_STATE_READY);
g_print("setting to PLAYING state\n"); g_print("setting to PLAYING state\n");
@ -286,7 +289,6 @@ gint start_from_file(guchar *filename)
g_print("setting to READY state\n"); g_print("setting to READY state\n");
gst_bin_create_plan(GST_BIN(pipeline));
gst_element_set_state(GST_ELEMENT(pipeline),GST_STATE_READY); gst_element_set_state(GST_ELEMENT(pipeline),GST_STATE_READY);
state = GSTPLAY_STOPPED; state = GSTPLAY_STOPPED;
@ -363,17 +365,13 @@ main (int argc, char *argv[])
gnome_dock_set_client_area(GNOME_DOCK(glade_xml_get_widget(xml, "dock1")), gnome_dock_set_client_area(GNOME_DOCK(glade_xml_get_widget(xml, "dock1")),
gst_util_get_widget_arg(GTK_OBJECT(show),"widget")); gst_util_get_widget_arg(GTK_OBJECT(show),"widget"));
gst_bin_add(GST_BIN(video_render_thread),GST_ELEMENT(show)); gst_bin_add(GST_BIN(video_render_thread),GST_ELEMENT(show));
gst_element_add_ghost_pad(GST_ELEMENT(video_render_thread),
gst_element_get_pad(show,"sink"));
glade_xml_signal_autoconnect(xml); glade_xml_signal_autoconnect(xml);
video_render_queue = gst_elementfactory_make("queue","video_render_queue"); video_render_queue = gst_elementfactory_make("queue","video_render_queue");
gtk_object_set(GTK_OBJECT(video_render_queue),"max_level",BUFFER,NULL); gtk_object_set(GTK_OBJECT(video_render_queue),"max_level",BUFFER,NULL);
gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(video_render_queue));
gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(video_render_thread));
gst_pad_connect(gst_element_get_pad(video_render_queue,"src"), gst_pad_connect(gst_element_get_pad(video_render_queue,"src"),
gst_element_get_pad(video_render_thread,"sink")); gst_element_get_pad(show,"sink"));
gtk_object_set(GTK_OBJECT(video_render_thread),"create_thread",TRUE,NULL); gtk_object_set(GTK_OBJECT(video_render_thread),"create_thread",TRUE,NULL);
@ -381,15 +379,11 @@ main (int argc, char *argv[])
g_return_val_if_fail(audio_render_thread != NULL, -1); g_return_val_if_fail(audio_render_thread != NULL, -1);
audio_play = gst_elementfactory_make("audiosink","play_audio"); audio_play = gst_elementfactory_make("audiosink","play_audio");
gst_bin_add(GST_BIN(audio_render_thread),GST_ELEMENT(audio_play)); gst_bin_add(GST_BIN(audio_render_thread),GST_ELEMENT(audio_play));
gst_element_add_ghost_pad(GST_ELEMENT(audio_render_thread),
gst_element_get_pad(audio_play,"sink"));
audio_render_queue = gst_elementfactory_make("queue","audio_render_queue"); audio_render_queue = gst_elementfactory_make("queue","audio_render_queue");
gtk_object_set(GTK_OBJECT(audio_render_queue),"max_level",BUFFER,NULL); gtk_object_set(GTK_OBJECT(audio_render_queue),"max_level",BUFFER,NULL);
gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(audio_render_queue));
gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(audio_render_thread));
gst_pad_connect(gst_element_get_pad(audio_render_queue,"src"), gst_pad_connect(gst_element_get_pad(audio_render_queue,"src"),
gst_element_get_pad(audio_render_thread,"sink")); gst_element_get_pad(audio_play,"sink"));
gtk_object_set(GTK_OBJECT(audio_render_thread),"create_thread",TRUE,NULL); gtk_object_set(GTK_OBJECT(audio_render_thread),"create_thread",TRUE,NULL);
if (argc > 1) { if (argc > 1) {

View file

@ -56,6 +56,7 @@ void mpeg1_setup_audio_thread(GstPad *pad, GstElement *audio_render_queue, GstEl
g_return_if_fail(audio_thread != NULL); g_return_if_fail(audio_thread != NULL);
gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(parse_audio)); gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(parse_audio));
gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(decode)); gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(decode));
gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(audio_render_queue));
// set up pad connections // set up pad connections
gst_element_add_ghost_pad(GST_ELEMENT(audio_thread), gst_element_add_ghost_pad(GST_ELEMENT(audio_thread),
@ -100,6 +101,7 @@ void mpeg1_setup_video_thread(GstPad *pad, GstElement *video_render_queue, GstEl
g_return_if_fail(video_thread != NULL); g_return_if_fail(video_thread != NULL);
gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(parse_video)); gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(parse_video));
gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(decode_video)); gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(decode_video));
gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(video_render_queue));
// set up pad connections // set up pad connections
gst_element_add_ghost_pad(GST_ELEMENT(video_thread), gst_element_add_ghost_pad(GST_ELEMENT(video_thread),

View file

@ -66,6 +66,7 @@ void mpeg2_new_pad_created(GstElement *parse,GstPad *pad,GstElement *pipeline)
g_return_if_fail(audio_thread != NULL); g_return_if_fail(audio_thread != NULL);
gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(parse_audio)); gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(parse_audio));
gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(decode)); gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(decode));
gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(audio_render_queue));
// set up pad connections // set up pad connections
gst_element_add_ghost_pad(GST_ELEMENT(audio_thread), gst_element_add_ghost_pad(GST_ELEMENT(audio_thread),
@ -116,6 +117,7 @@ void mpeg2_setup_video_thread(GstPad *pad, GstElement *show, GstElement *pipelin
gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(parse_video)); gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(parse_video));
gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(decode_video)); gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(decode_video));
gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(merge_subtitles)); gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(merge_subtitles));
gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(video_render_queue));
gst_bin_use_cothreads(GST_BIN(video_thread), FALSE); gst_bin_use_cothreads(GST_BIN(video_thread), FALSE);
// set up pad connections // set up pad connections

View file

@ -47,14 +47,14 @@ while test $# -gt 0; do
elif test @includedir@ != /usr/include ; then elif test @includedir@ != /usr/include ; then
includes=-I@includedir@ includes=-I@includedir@
fi fi
echo $includes `gtk-config --cflags` echo $includes `gtk-config --cflags gtk gthread`
;; ;;
--libs) --libs)
if test $prefix -ef @builddir@ ; then if test $prefix -ef @builddir@ ; then
echo @builddir@/libgst.la `gtk-config --libs` echo @builddir@/libgst.la `gtk-config --libs gtk gthread`
else else
libdirs=-L@libdir@ libdirs=-L@libdir@
echo $libdirs -lgst `gtk-config --libs` echo $libdirs -lgst `gtk-config --libs gtk gthread`
fi fi
;; ;;
*) *)

View file

@ -27,6 +27,7 @@
#include <gstqueue.h> #include <gstqueue.h>
#include <gst/gstarch.h>
GstElementDetails gst_queue_details = { GstElementDetails gst_queue_details = {
"Queue", "Queue",
@ -105,6 +106,7 @@ static void gst_queue_class_init(GstQueueClass *klass) {
static void gst_queue_init(GstQueue *queue) { static void gst_queue_init(GstQueue *queue) {
queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK); queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK);
gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad); gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad);
gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain); gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain);
queue->srcpad = gst_pad_new("src",GST_PAD_SRC); queue->srcpad = gst_pad_new("src",GST_PAD_SRC);
gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad); gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad);
@ -150,9 +152,10 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
/* we have to lock the queue since we span threads */ /* we have to lock the queue since we span threads */
DEBUG("queue: %s adding buffer %p\n", name, buf); DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self());
GST_LOCK(queue); GST_LOCK(queue);
DEBUG("queue: have queue lock\n");
if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) { if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) {
g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name); g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name);
@ -164,18 +167,17 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
if (queue->level_buffers >= queue->max_buffers) {
DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
while (queue->level_buffers >= queue->max_buffers) {
GST_UNLOCK(queue);
g_mutex_lock(queue->fulllock); g_mutex_lock(queue->fulllock);
while (queue->level_buffers >= queue->max_buffers) {
DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
STATUS("%s: O\n"); STATUS("%s: O\n");
GST_UNLOCK(queue);
g_cond_wait(queue->fullcond,queue->fulllock); g_cond_wait(queue->fullcond,queue->fulllock);
g_mutex_unlock(queue->fulllock);
GST_LOCK(queue); GST_LOCK(queue);
} STATUS("%s: O+\n");
DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
} }
g_mutex_unlock(queue->fulllock);
/* put the buffer on the head of the list */ /* put the buffer on the head of the list */
@ -192,18 +194,19 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
STATUS("%s: +\n"); STATUS("%s: +\n");
/* if we were empty, but aren't any more, signal a condition */ /* if we were empty, but aren't any more, signal a condition */
tosignal = (queue->level_buffers <= 0); tosignal = (queue->level_buffers >= 0);
queue->level_buffers++; queue->level_buffers++;
/* we can unlock now */ /* we can unlock now */
DEBUG("queue: %s chain %d end\n", name, queue->level_buffers); DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
GST_UNLOCK(queue); GST_UNLOCK(queue);
if (tosignal) { if (tosignal) {
g_mutex_lock(queue->emptylock); g_mutex_lock(queue->emptylock);
STATUS("%s: >\n");
g_cond_signal(queue->emptycond); g_cond_signal(queue->emptycond);
STATUS("%s: >>\n");
g_mutex_unlock(queue->emptylock); g_mutex_unlock(queue->emptylock);
//g_print(">");
} }
} }
@ -216,17 +219,20 @@ void gst_queue_push(GstConnection *connection) {
name = gst_element_get_name(GST_ELEMENT(queue)); name = gst_element_get_name(GST_ELEMENT(queue));
DEBUG("queue: %s push %d\n", name, queue->level_buffers); DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond);
/* have to lock for thread-safety */ /* have to lock for thread-safety */
DEBUG("queue: try have queue lock\n");
GST_LOCK(queue); GST_LOCK(queue);
DEBUG("queue: have queue lock\n");
while (!queue->level_buffers) { while (!queue->level_buffers) {
STATUS("%s: U released lock\n");
GST_UNLOCK(queue); GST_UNLOCK(queue);
g_mutex_lock(queue->emptylock); g_mutex_lock(queue->emptylock);
STATUS("%s: U\n");
g_cond_wait(queue->emptycond,queue->emptylock); g_cond_wait(queue->emptycond,queue->emptylock);
g_mutex_unlock(queue->emptylock); g_mutex_unlock(queue->emptylock);
GST_LOCK(queue); GST_LOCK(queue);
STATUS("%s: U- getting lock\n");
} }
front = queue->queue; front = queue->queue;
@ -240,13 +246,15 @@ void gst_queue_push(GstConnection *connection) {
if (tosignal) { if (tosignal) {
g_mutex_lock(queue->fulllock); g_mutex_lock(queue->fulllock);
STATUS("%s: < \n");
g_cond_signal(queue->fullcond); g_cond_signal(queue->fullcond);
STATUS("%s: << \n");
g_mutex_unlock(queue->fulllock); g_mutex_unlock(queue->fulllock);
} }
DEBUG("queue: %s pushing %d %p\n", name, queue->level_buffers, buf); //DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf);
gst_pad_push(queue->srcpad,buf); gst_pad_push(queue->srcpad,buf);
DEBUG("queue: %s pushing %d done\n", name, queue->level_buffers); //DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers);
/* unlock now */ /* unlock now */
} }

View file

@ -158,7 +158,7 @@ void mp2tomp1(GstElement *parser,GstPad *pad, GstElement *pipeline) {
gtk_object_set(GTK_OBJECT(smooth),"active",FALSE,NULL); gtk_object_set(GTK_OBJECT(smooth),"active",FALSE,NULL);
encode = gst_elementfactory_make("mpeg2enc","encode"); encode = gst_elementfactory_make("mpeg2enc","encode");
g_return_if_fail(encode != NULL); g_return_if_fail(encode != NULL);
gtk_object_set(GTK_OBJECT(encode),"frames_per_second",25.0,NULL); gtk_object_set(GTK_OBJECT(encode),"frames_per_second",29.97,NULL);
//encode = gst_elementfactory_make("mpeg1encoder","encode"); //encode = gst_elementfactory_make("mpeg1encoder","encode");
//gtk_object_set(GTK_OBJECT(show),"width",640, "height", 480,NULL); //gtk_object_set(GTK_OBJECT(show),"width",640, "height", 480,NULL);

View file

@ -50,9 +50,6 @@ int main(int argc,char *argv[])
gst_pad_connect(gst_element_get_pad(decoder,"src"), gst_pad_connect(gst_element_get_pad(decoder,"src"),
gst_element_get_pad(audiosink,"sink")); gst_element_get_pad(audiosink,"sink"));
/* find out how to handle this bin */
gst_bin_create_plan(GST_BIN(bin));
/* make it ready */ /* make it ready */
gst_element_set_state(bin, GST_STATE_READY); gst_element_set_state(bin, GST_STATE_READY);
/* start playing */ /* start playing */

View file

@ -46,9 +46,6 @@ int main(int argc,char *argv[])
exit(-1); exit(-1);
} }
/* find out how to handle this bin */
gst_bin_create_plan(GST_BIN(pipeline));
/* make it ready */ /* make it ready */
gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY); gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY);
/* start playing */ /* start playing */

2
tests/old/examples/queue/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
Makefile
queue

View file

@ -0,0 +1,84 @@
#include <gst/gst.h>
gboolean playing;
/* eos will be called when the src element has an end of stream */
void eos(GstSrc *src, gpointer data)
{
g_print("have eos, quitting\n");
playing = FALSE;
}
int main(int argc,char *argv[])
{
GstElement *disksrc, *audiosink, *parse, *decode, *queue;
GstElement *bin;
GstElement *thread;
if (argc != 2) {
g_print("usage: %s <filename>\n", argv[0]);
exit(-1);
}
gst_init(&argc,&argv);
/* create a new thread to hold the elements */
thread = gst_thread_new("thread");
g_assert(thread != NULL);
/* create a new bin to hold the elements */
bin = gst_bin_new("bin");
g_assert(bin != NULL);
/* create a disk reader */
disksrc = gst_elementfactory_make("disksrc", "disk_source");
g_assert(disksrc != NULL);
gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
GTK_SIGNAL_FUNC(eos), thread);
parse = gst_elementfactory_make("mp3parse", "parse");
decode = gst_elementfactory_make("mpg123", "decode");
queue = gst_elementfactory_make("queue", "queue");
/* and an audio sink */
audiosink = gst_elementfactory_make("audiosink", "play_audio");
g_assert(audiosink != NULL);
/* add objects to the main pipeline */
gst_bin_add(GST_BIN(bin), disksrc);
gst_bin_add(GST_BIN(bin), parse);
gst_bin_add(GST_BIN(bin), decode);
gst_bin_add(GST_BIN(bin), queue);
gst_bin_add(GST_BIN(thread), audiosink);
gst_bin_add(GST_BIN(bin), thread);
gst_pad_connect(gst_element_get_pad(disksrc,"src"),
gst_element_get_pad(parse,"sink"));
gst_pad_connect(gst_element_get_pad(parse,"src"),
gst_element_get_pad(decode,"sink"));
gst_pad_connect(gst_element_get_pad(decode,"src"),
gst_element_get_pad(queue,"sink"));
gst_pad_connect(gst_element_get_pad(queue,"src"),
gst_element_get_pad(audiosink,"sink"));
/* make it ready */
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY);
/* start playing */
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING);
playing = TRUE;
while (playing) {
gst_bin_iterate(GST_BIN(bin));
}
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL);
exit(0);
}

2
tests/old/examples/queue2/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
Makefile
queue2

View file

@ -0,0 +1,81 @@
#include <gst/gst.h>
gboolean playing;
/* eos will be called when the src element has an end of stream */
void eos(GstSrc *src, gpointer data)
{
g_print("have eos, quitting\n");
playing = FALSE;
}
int main(int argc,char *argv[])
{
GstElement *disksrc, *audiosink, *queue;
GstElement *pipeline;
GstElement *thread;
if (argc != 2) {
g_print("usage: %s <filename>\n", argv[0]);
exit(-1);
}
gst_init(&argc,&argv);
/* create a new thread to hold the elements */
thread = gst_thread_new("thread");
g_assert(thread != NULL);
/* create a new bin to hold the elements */
pipeline = gst_pipeline_new("pipeline");
g_assert(pipeline != NULL);
/* create a disk reader */
disksrc = gst_elementfactory_make("disksrc", "disk_source");
g_assert(disksrc != NULL);
gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
GTK_SIGNAL_FUNC(eos), thread);
queue = gst_elementfactory_make("queue", "queue");
/* and an audio sink */
audiosink = gst_elementfactory_make("audiosink", "play_audio");
g_assert(audiosink != NULL);
/* add objects to the main pipeline */
gst_pipeline_add_src(GST_PIPELINE(pipeline), disksrc);
gst_pipeline_add_sink(GST_PIPELINE(pipeline), queue);
gst_bin_add(GST_BIN(thread), audiosink);
gst_pad_connect(gst_element_get_pad(queue,"src"),
gst_element_get_pad(audiosink,"sink"));
gst_pad_set_type_id(gst_element_get_pad(queue, "sink"),
gst_pad_get_type_id(gst_element_get_pad(audiosink, "sink")));
if (!gst_pipeline_autoplug(GST_PIPELINE(pipeline))) {
g_print("cannot autoplug pipeline\n");
exit(-1);
}
gst_bin_add(GST_BIN(pipeline), thread);
/* make it ready */
gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY);
/* start playing */
gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING);
playing = TRUE;
while (playing) {
gst_bin_iterate(GST_BIN(pipeline));
}
gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_NULL);
exit(0);
}

2
tests/old/examples/queue3/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
Makefile
queue3

View file

@ -0,0 +1,85 @@
#include <gst/gst.h>
gboolean playing;
/* eos will be called when the src element has an end of stream */
void eos(GstSrc *src, gpointer data)
{
g_print("have eos, quitting\n");
playing = FALSE;
}
int main(int argc,char *argv[])
{
GstElement *disksrc, *audiosink, *queue, *parse, *decode;
GstElement *bin;
GstElement *thread;
if (argc != 2) {
g_print("usage: %s <filename>\n", argv[0]);
exit(-1);
}
gst_init(&argc,&argv);
/* create a new thread to hold the elements */
thread = gst_thread_new("thread");
g_assert(thread != NULL);
/* create a new bin to hold the elements */
bin = gst_bin_new("bin");
g_assert(bin != NULL);
/* create a disk reader */
disksrc = gst_elementfactory_make("disksrc", "disk_source");
g_assert(disksrc != NULL);
gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL);
gtk_signal_connect(GTK_OBJECT(disksrc),"eos",
GTK_SIGNAL_FUNC(eos), thread);
queue = gst_elementfactory_make("queue", "queue");
/* and an audio sink */
audiosink = gst_elementfactory_make("audiosink", "play_audio");
g_assert(audiosink != NULL);
parse = gst_elementfactory_make("mp3parse", "parse");
decode = gst_elementfactory_make("mpg123", "decode");
/* add objects to the main bin */
gst_bin_add(GST_BIN(bin), disksrc);
gst_bin_add(GST_BIN(bin), queue);
gst_bin_add(GST_BIN(thread), parse);
gst_bin_add(GST_BIN(thread), decode);
gst_bin_add(GST_BIN(thread), audiosink);
gst_pad_connect(gst_element_get_pad(disksrc,"src"),
gst_element_get_pad(queue,"sink"));
gst_pad_connect(gst_element_get_pad(queue,"src"),
gst_element_get_pad(parse,"sink"));
gst_pad_connect(gst_element_get_pad(parse,"src"),
gst_element_get_pad(decode,"sink"));
gst_pad_connect(gst_element_get_pad(decode,"src"),
gst_element_get_pad(audiosink,"sink"));
gst_bin_add(GST_BIN(bin), thread);
/* make it ready */
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY);
/* start playing */
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING);
playing = TRUE;
while (playing) {
gst_bin_iterate(GST_BIN(bin));
}
gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL);
exit(0);
}

View file

@ -53,9 +53,9 @@ int main(int argc,char *argv[])
exit(-1); exit(-1);
} }
gst_bin_remove(GST_BIN(pipeline), disksrc); //gst_bin_remove(GST_BIN(pipeline), disksrc);
gst_bin_add(GST_BIN(thread), disksrc); //gst_bin_add(GST_BIN(thread), disksrc);
gst_bin_add(GST_BIN(thread), GST_ELEMENT(pipeline)); gst_bin_add(GST_BIN(thread), GST_ELEMENT(pipeline));
/* make it ready */ /* make it ready */