Fixing two bugs: 1) I had commented out an unlock. duh.

Original commit message from CVS:
Fixing two bugs:

1) I had commented out an unlock.  duh.
2) changed the _get function to return the buffer rather than call _push

Also uncommented some debugging I'd turned off.  Need to solve the verbosity
problem somehow, I think by way of debug levels as well as info levels...
This commit is contained in:
Erik Walthinsen 2000-12-21 01:27:27 +00:00
parent c287566d0c
commit e8bb90705f
2 changed files with 38 additions and 34 deletions

View file

@ -58,8 +58,8 @@ static void gst_queue_init (GstQueue *queue);
static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id);
static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id);
static void gst_queue_get (GstPad *pad);
static void gst_queue_chain (GstPad *pad, GstBuffer *buf); static void gst_queue_chain (GstPad *pad, GstBuffer *buf);
static GstBuffer * gst_queue_get (GstPad *pad);
static void gst_queue_flush (GstQueue *queue); static void gst_queue_flush (GstQueue *queue);
@ -174,20 +174,20 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
name = gst_element_get_name (GST_ELEMENT (queue)); name = gst_element_get_name (GST_ELEMENT (queue));
/* we have to lock the queue since we span threads */ /* we have to lock the queue since we span threads */
// DEBUG("queue: try have queue lock\n"); DEBUG("queue: try have queue lock\n");
GST_LOCK (queue); GST_LOCK (queue);
// DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ()); DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ());
// DEBUG("queue: have queue lock\n"); DEBUG("queue: have queue lock\n");
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) { if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
gst_queue_flush (queue); gst_queue_flush (queue);
} }
// DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
while (queue->level_buffers >= queue->max_buffers) { while (queue->level_buffers >= queue->max_buffers) {
// DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
STATUS("%s: O\n"); STATUS("%s: O\n");
GST_UNLOCK (queue); GST_UNLOCK (queue);
g_mutex_lock (queue->fulllock); g_mutex_lock (queue->fulllock);
@ -195,7 +195,7 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
g_mutex_unlock (queue->fulllock); g_mutex_unlock (queue->fulllock);
GST_LOCK (queue); GST_LOCK (queue);
STATUS("%s: O+\n"); STATUS("%s: O+\n");
// DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
} }
/* put the buffer on the tail of the list */ /* put the buffer on the tail of the list */
@ -208,7 +208,7 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
queue->level_buffers++; queue->level_buffers++;
/* we can unlock now */ /* we can unlock now */
// DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond); DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
GST_UNLOCK (queue); GST_UNLOCK (queue);
if (tosignal) { if (tosignal) {
@ -220,7 +220,7 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
} }
} }
static void static GstBuffer *
gst_queue_get (GstPad *pad) gst_queue_get (GstPad *pad)
{ {
GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad)); GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad));
@ -232,14 +232,14 @@ gst_queue_get (GstPad *pad)
name = gst_element_get_name (GST_ELEMENT (queue)); name = gst_element_get_name (GST_ELEMENT (queue));
/* have to lock for thread-safety */ /* have to lock for thread-safety */
// DEBUG("queue: %s try have queue lock\n", name); DEBUG("queue: %s try have queue lock\n", name);
GST_LOCK (queue); GST_LOCK (queue);
// DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond); DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
// DEBUG("queue: %s have queue lock\n", name); DEBUG("queue: %s have queue lock\n", name);
while (!queue->level_buffers) { while (!queue->level_buffers) {
// STATUS("queue: %s U released lock\n"); STATUS("queue: %s U released lock\n");
// GST_UNLOCK (queue); GST_UNLOCK (queue);
g_mutex_lock (queue->emptylock); g_mutex_lock (queue->emptylock);
g_cond_wait (queue->emptycond, queue->emptylock); g_cond_wait (queue->emptycond, queue->emptylock);
g_mutex_unlock (queue->emptylock); g_mutex_unlock (queue->emptylock);
@ -249,9 +249,10 @@ gst_queue_get (GstPad *pad)
front = queue->queue; front = queue->queue;
buf = (GstBuffer *)(front->data); buf = (GstBuffer *)(front->data);
DEBUG("retrieved buffer %p from queue\n",buf);
queue->queue = g_slist_remove_link (queue->queue, front); queue->queue = g_slist_remove_link (queue->queue, front);
g_slist_free (front); g_slist_free (front);
queue->level_buffers--; queue->level_buffers--;
// STATUS("%s: -\n"); // STATUS("%s: -\n");
g_print("(%s:%s)- ",GST_DEBUG_PAD_NAME(pad)); g_print("(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
@ -267,9 +268,10 @@ gst_queue_get (GstPad *pad)
} }
// DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf); // DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf);
gst_pad_push (queue->srcpad, buf); // gst_pad_push (queue->srcpad, buf);
// DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers); // DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers);
return buf;
/* unlock now */ /* unlock now */
} }

View file

@ -58,8 +58,8 @@ static void gst_queue_init (GstQueue *queue);
static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id);
static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id);
static void gst_queue_get (GstPad *pad);
static void gst_queue_chain (GstPad *pad, GstBuffer *buf); static void gst_queue_chain (GstPad *pad, GstBuffer *buf);
static GstBuffer * gst_queue_get (GstPad *pad);
static void gst_queue_flush (GstQueue *queue); static void gst_queue_flush (GstQueue *queue);
@ -174,20 +174,20 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
name = gst_element_get_name (GST_ELEMENT (queue)); name = gst_element_get_name (GST_ELEMENT (queue));
/* we have to lock the queue since we span threads */ /* we have to lock the queue since we span threads */
// DEBUG("queue: try have queue lock\n"); DEBUG("queue: try have queue lock\n");
GST_LOCK (queue); GST_LOCK (queue);
// DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ()); DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ());
// DEBUG("queue: have queue lock\n"); DEBUG("queue: have queue lock\n");
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) { if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
gst_queue_flush (queue); gst_queue_flush (queue);
} }
// DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
while (queue->level_buffers >= queue->max_buffers) { while (queue->level_buffers >= queue->max_buffers) {
// DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
STATUS("%s: O\n"); STATUS("%s: O\n");
GST_UNLOCK (queue); GST_UNLOCK (queue);
g_mutex_lock (queue->fulllock); g_mutex_lock (queue->fulllock);
@ -195,7 +195,7 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
g_mutex_unlock (queue->fulllock); g_mutex_unlock (queue->fulllock);
GST_LOCK (queue); GST_LOCK (queue);
STATUS("%s: O+\n"); STATUS("%s: O+\n");
// DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
} }
/* put the buffer on the tail of the list */ /* put the buffer on the tail of the list */
@ -208,7 +208,7 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
queue->level_buffers++; queue->level_buffers++;
/* we can unlock now */ /* we can unlock now */
// DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond); DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
GST_UNLOCK (queue); GST_UNLOCK (queue);
if (tosignal) { if (tosignal) {
@ -220,7 +220,7 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
} }
} }
static void static GstBuffer *
gst_queue_get (GstPad *pad) gst_queue_get (GstPad *pad)
{ {
GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad)); GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad));
@ -232,14 +232,14 @@ gst_queue_get (GstPad *pad)
name = gst_element_get_name (GST_ELEMENT (queue)); name = gst_element_get_name (GST_ELEMENT (queue));
/* have to lock for thread-safety */ /* have to lock for thread-safety */
// DEBUG("queue: %s try have queue lock\n", name); DEBUG("queue: %s try have queue lock\n", name);
GST_LOCK (queue); GST_LOCK (queue);
// DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond); DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
// DEBUG("queue: %s have queue lock\n", name); DEBUG("queue: %s have queue lock\n", name);
while (!queue->level_buffers) { while (!queue->level_buffers) {
// STATUS("queue: %s U released lock\n"); STATUS("queue: %s U released lock\n");
// GST_UNLOCK (queue); GST_UNLOCK (queue);
g_mutex_lock (queue->emptylock); g_mutex_lock (queue->emptylock);
g_cond_wait (queue->emptycond, queue->emptylock); g_cond_wait (queue->emptycond, queue->emptylock);
g_mutex_unlock (queue->emptylock); g_mutex_unlock (queue->emptylock);
@ -249,9 +249,10 @@ gst_queue_get (GstPad *pad)
front = queue->queue; front = queue->queue;
buf = (GstBuffer *)(front->data); buf = (GstBuffer *)(front->data);
DEBUG("retrieved buffer %p from queue\n",buf);
queue->queue = g_slist_remove_link (queue->queue, front); queue->queue = g_slist_remove_link (queue->queue, front);
g_slist_free (front); g_slist_free (front);
queue->level_buffers--; queue->level_buffers--;
// STATUS("%s: -\n"); // STATUS("%s: -\n");
g_print("(%s:%s)- ",GST_DEBUG_PAD_NAME(pad)); g_print("(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
@ -267,9 +268,10 @@ gst_queue_get (GstPad *pad)
} }
// DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf); // DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf);
gst_pad_push (queue->srcpad, buf); // gst_pad_push (queue->srcpad, buf);
// DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers); // DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers);
return buf;
/* unlock now */ /* unlock now */
} }