diff --git a/ChangeLog b/ChangeLog index 7f624d23a0..089c346e5d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,12 @@ +2004-06-11 Thomas Vander Stichele + + * gst/gstqueue.c: + * gst/gstqueue.h: + fix removing from the wrong queue on event timeout + fix disposing of the event queue by casting correctly + add mutexes for handling the event queue + someone was sleeping when fixing queue last time around :) + 2004-06-10 Johan Dahlin * gst/gst.c (gst_init_check_with_popt_table): Do not fail on diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 0f6c076099..094464f0c9 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -288,6 +288,7 @@ gst_queue_init (GstQueue * queue) queue->item_del = g_cond_new (); queue->event_done = g_cond_new (); queue->events = g_queue_new (); + queue->event_lock = g_mutex_new (); queue->queue = g_queue_new (); GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, @@ -311,11 +312,14 @@ gst_queue_dispose (GObject * object) g_cond_free (queue->item_add); g_cond_free (queue->item_del); g_cond_free (queue->event_done); + g_mutex_lock (queue->event_lock); while (!g_queue_is_empty (queue->events)) { - GstEvent *event = g_queue_pop_head (queue->events); + GstQueueEventResponse *er = g_queue_pop_head (queue->events); - gst_event_unref (event); + gst_event_unref (er->event); } + g_mutex_unlock (queue->event_lock); + g_mutex_free (queue->event_lock); g_queue_free (queue->events); if (G_OBJECT_CLASS (parent_class)->dispose) @@ -390,15 +394,33 @@ static void gst_queue_handle_pending_events (GstQueue * queue) { /* check for events to send upstream */ + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, + "handling pending events, events queue of size %d", + g_queue_get_length (queue->events)); + g_mutex_lock (queue->event_lock); while (!g_queue_is_empty (queue->events)) { - GstQueueEventResponse *er = g_queue_pop_head (queue->events); + GstQueueEventResponse *er; - GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "sending event upstream"); + er = g_queue_pop_head (queue->events); + + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, + "sending event %p (%d) from event response %p upstream", + er->event, GST_EVENT_TYPE (er->event), er); + if (er->handled) { + /* change this to an assert when this file gets reviewed properly. */ + GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL), + ("already handled event %p (%d) from event response %p upstream", + er->event, GST_EVENT_TYPE (er->event), er)); + break; + } + g_mutex_unlock (queue->event_lock); er->ret = gst_pad_event_default (queue->srcpad, er->event); er->handled = TRUE; g_cond_signal (queue->event_done); + g_mutex_lock (queue->event_lock); GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent"); } + g_mutex_unlock (queue->event_lock); } #define STATUS(queue, msg) \ @@ -770,6 +792,8 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); gboolean res; + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)", + event, GST_EVENT_TYPE (event)); g_mutex_lock (queue->qlock); if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { @@ -778,7 +802,12 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) /* push the event to the queue and wait for upstream consumption */ er.event = event; er.handled = FALSE; + g_mutex_lock (queue->event_lock); + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, + "putting event %p (%d) on internal queue", event, + GST_EVENT_TYPE (event)); g_queue_push_tail (queue->events, &er); + g_mutex_unlock (queue->event_lock); GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Preparing for loop for event handler"); /* see the chain function on why this is here - it prevents a deadlock */ @@ -791,13 +820,17 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) && !er.handled) { GST_CAT_WARNING_OBJECT (queue_dataflow, queue, - "timeout in upstream event handling"); - /* remove ourselves from the pending list. Since we're - * locked, others cannot reference this anymore. */ - queue->queue->head = g_list_remove (queue->queue->head, &er); - queue->queue->head = g_list_first (queue->queue->head); - queue->queue->tail = g_list_last (queue->queue->head); - queue->queue->length--; + "timeout in upstream event handling, dropping event %p (%s)", + er.event, GST_EVENT_TYPE (er.event)); + g_mutex_lock (queue->event_lock); + /* since this queue is for src events (ie upstream), this thread is + * the only one that is pushing stuff on it, so we're sure that + * it's still the tail element. FIXME: But in practice, we should use + * GList instead of GQueue for this so we can remove any element in + * the list. */ + g_queue_pop_tail (queue->events); + g_mutex_unlock (queue->event_lock); + gst_event_unref (er.event); res = FALSE; goto handled; } diff --git a/gst/gstqueue.h b/gst/gstqueue.h index 8177566355..aab0c3b4db 100644 --- a/gst/gstqueue.h +++ b/gst/gstqueue.h @@ -91,6 +91,7 @@ struct _GstQueue { GTimeVal *timeval; /* the timeout for the queue locking */ GQueue *events; /* upstream events get decoupled here */ + GMutex *event_lock; /* lock when handling the events queue */ GstCaps *negotiated_caps; diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 0f6c076099..094464f0c9 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -288,6 +288,7 @@ gst_queue_init (GstQueue * queue) queue->item_del = g_cond_new (); queue->event_done = g_cond_new (); queue->events = g_queue_new (); + queue->event_lock = g_mutex_new (); queue->queue = g_queue_new (); GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, @@ -311,11 +312,14 @@ gst_queue_dispose (GObject * object) g_cond_free (queue->item_add); g_cond_free (queue->item_del); g_cond_free (queue->event_done); + g_mutex_lock (queue->event_lock); while (!g_queue_is_empty (queue->events)) { - GstEvent *event = g_queue_pop_head (queue->events); + GstQueueEventResponse *er = g_queue_pop_head (queue->events); - gst_event_unref (event); + gst_event_unref (er->event); } + g_mutex_unlock (queue->event_lock); + g_mutex_free (queue->event_lock); g_queue_free (queue->events); if (G_OBJECT_CLASS (parent_class)->dispose) @@ -390,15 +394,33 @@ static void gst_queue_handle_pending_events (GstQueue * queue) { /* check for events to send upstream */ + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, + "handling pending events, events queue of size %d", + g_queue_get_length (queue->events)); + g_mutex_lock (queue->event_lock); while (!g_queue_is_empty (queue->events)) { - GstQueueEventResponse *er = g_queue_pop_head (queue->events); + GstQueueEventResponse *er; - GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "sending event upstream"); + er = g_queue_pop_head (queue->events); + + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, + "sending event %p (%d) from event response %p upstream", + er->event, GST_EVENT_TYPE (er->event), er); + if (er->handled) { + /* change this to an assert when this file gets reviewed properly. */ + GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL), + ("already handled event %p (%d) from event response %p upstream", + er->event, GST_EVENT_TYPE (er->event), er)); + break; + } + g_mutex_unlock (queue->event_lock); er->ret = gst_pad_event_default (queue->srcpad, er->event); er->handled = TRUE; g_cond_signal (queue->event_done); + g_mutex_lock (queue->event_lock); GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent"); } + g_mutex_unlock (queue->event_lock); } #define STATUS(queue, msg) \ @@ -770,6 +792,8 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); gboolean res; + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)", + event, GST_EVENT_TYPE (event)); g_mutex_lock (queue->qlock); if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { @@ -778,7 +802,12 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) /* push the event to the queue and wait for upstream consumption */ er.event = event; er.handled = FALSE; + g_mutex_lock (queue->event_lock); + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, + "putting event %p (%d) on internal queue", event, + GST_EVENT_TYPE (event)); g_queue_push_tail (queue->events, &er); + g_mutex_unlock (queue->event_lock); GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Preparing for loop for event handler"); /* see the chain function on why this is here - it prevents a deadlock */ @@ -791,13 +820,17 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) && !er.handled) { GST_CAT_WARNING_OBJECT (queue_dataflow, queue, - "timeout in upstream event handling"); - /* remove ourselves from the pending list. Since we're - * locked, others cannot reference this anymore. */ - queue->queue->head = g_list_remove (queue->queue->head, &er); - queue->queue->head = g_list_first (queue->queue->head); - queue->queue->tail = g_list_last (queue->queue->head); - queue->queue->length--; + "timeout in upstream event handling, dropping event %p (%s)", + er.event, GST_EVENT_TYPE (er.event)); + g_mutex_lock (queue->event_lock); + /* since this queue is for src events (ie upstream), this thread is + * the only one that is pushing stuff on it, so we're sure that + * it's still the tail element. FIXME: But in practice, we should use + * GList instead of GQueue for this so we can remove any element in + * the list. */ + g_queue_pop_tail (queue->events); + g_mutex_unlock (queue->event_lock); + gst_event_unref (er.event); res = FALSE; goto handled; } diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 8177566355..aab0c3b4db 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -91,6 +91,7 @@ struct _GstQueue { GTimeVal *timeval; /* the timeout for the queue locking */ GQueue *events; /* upstream events get decoupled here */ + GMutex *event_lock; /* lock when handling the events queue */ GstCaps *negotiated_caps;