fix queue event code

Original commit message from CVS:
fix queue event code
This commit is contained in:
Thomas Vander Stichele 2004-06-11 15:18:58 +00:00
parent b748b61e9f
commit e5c9a61a6b
5 changed files with 99 additions and 22 deletions

View file

@ -1,3 +1,12 @@
2004-06-11 Thomas Vander Stichele <thomas at apestaart dot org>
* gst/gstqueue.c:
* gst/gstqueue.h:
fix removing from the wrong queue on event timeout
fix disposing of the event queue by casting correctly
add mutexes for handling the event queue
someone was sleeping when fixing queue last time around :)
2004-06-10 Johan Dahlin <johan@gnome.org> 2004-06-10 Johan Dahlin <johan@gnome.org>
* gst/gst.c (gst_init_check_with_popt_table): Do not fail on * gst/gst.c (gst_init_check_with_popt_table): Do not fail on

View file

@ -288,6 +288,7 @@ gst_queue_init (GstQueue * queue)
queue->item_del = g_cond_new (); queue->item_del = g_cond_new ();
queue->event_done = g_cond_new (); queue->event_done = g_cond_new ();
queue->events = g_queue_new (); queue->events = g_queue_new ();
queue->event_lock = g_mutex_new ();
queue->queue = g_queue_new (); queue->queue = g_queue_new ();
GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
@ -311,11 +312,14 @@ gst_queue_dispose (GObject * object)
g_cond_free (queue->item_add); g_cond_free (queue->item_add);
g_cond_free (queue->item_del); g_cond_free (queue->item_del);
g_cond_free (queue->event_done); g_cond_free (queue->event_done);
g_mutex_lock (queue->event_lock);
while (!g_queue_is_empty (queue->events)) { while (!g_queue_is_empty (queue->events)) {
GstEvent *event = g_queue_pop_head (queue->events); GstQueueEventResponse *er = g_queue_pop_head (queue->events);
gst_event_unref (event); gst_event_unref (er->event);
} }
g_mutex_unlock (queue->event_lock);
g_mutex_free (queue->event_lock);
g_queue_free (queue->events); g_queue_free (queue->events);
if (G_OBJECT_CLASS (parent_class)->dispose) if (G_OBJECT_CLASS (parent_class)->dispose)
@ -390,15 +394,33 @@ static void
gst_queue_handle_pending_events (GstQueue * queue) gst_queue_handle_pending_events (GstQueue * queue)
{ {
/* check for events to send upstream */ /* check for events to send upstream */
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"handling pending events, events queue of size %d",
g_queue_get_length (queue->events));
g_mutex_lock (queue->event_lock);
while (!g_queue_is_empty (queue->events)) { while (!g_queue_is_empty (queue->events)) {
GstQueueEventResponse *er = g_queue_pop_head (queue->events); GstQueueEventResponse *er;
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "sending event upstream"); er = g_queue_pop_head (queue->events);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"sending event %p (%d) from event response %p upstream",
er->event, GST_EVENT_TYPE (er->event), er);
if (er->handled) {
/* change this to an assert when this file gets reviewed properly. */
GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL),
("already handled event %p (%d) from event response %p upstream",
er->event, GST_EVENT_TYPE (er->event), er));
break;
}
g_mutex_unlock (queue->event_lock);
er->ret = gst_pad_event_default (queue->srcpad, er->event); er->ret = gst_pad_event_default (queue->srcpad, er->event);
er->handled = TRUE; er->handled = TRUE;
g_cond_signal (queue->event_done); g_cond_signal (queue->event_done);
g_mutex_lock (queue->event_lock);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent"); GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
} }
g_mutex_unlock (queue->event_lock);
} }
#define STATUS(queue, msg) \ #define STATUS(queue, msg) \
@ -770,6 +792,8 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
gboolean res; gboolean res;
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
event, GST_EVENT_TYPE (event));
g_mutex_lock (queue->qlock); g_mutex_lock (queue->qlock);
if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
@ -778,7 +802,12 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
/* push the event to the queue and wait for upstream consumption */ /* push the event to the queue and wait for upstream consumption */
er.event = event; er.event = event;
er.handled = FALSE; er.handled = FALSE;
g_mutex_lock (queue->event_lock);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"putting event %p (%d) on internal queue", event,
GST_EVENT_TYPE (event));
g_queue_push_tail (queue->events, &er); g_queue_push_tail (queue->events, &er);
g_mutex_unlock (queue->event_lock);
GST_CAT_WARNING_OBJECT (queue_dataflow, queue, GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
"Preparing for loop for event handler"); "Preparing for loop for event handler");
/* see the chain function on why this is here - it prevents a deadlock */ /* see the chain function on why this is here - it prevents a deadlock */
@ -791,13 +820,17 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) && if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
!er.handled) { !er.handled) {
GST_CAT_WARNING_OBJECT (queue_dataflow, queue, GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
"timeout in upstream event handling"); "timeout in upstream event handling, dropping event %p (%s)",
/* remove ourselves from the pending list. Since we're er.event, GST_EVENT_TYPE (er.event));
* locked, others cannot reference this anymore. */ g_mutex_lock (queue->event_lock);
queue->queue->head = g_list_remove (queue->queue->head, &er); /* since this queue is for src events (ie upstream), this thread is
queue->queue->head = g_list_first (queue->queue->head); * the only one that is pushing stuff on it, so we're sure that
queue->queue->tail = g_list_last (queue->queue->head); * it's still the tail element. FIXME: But in practice, we should use
queue->queue->length--; * GList instead of GQueue for this so we can remove any element in
* the list. */
g_queue_pop_tail (queue->events);
g_mutex_unlock (queue->event_lock);
gst_event_unref (er.event);
res = FALSE; res = FALSE;
goto handled; goto handled;
} }

View file

@ -91,6 +91,7 @@ struct _GstQueue {
GTimeVal *timeval; /* the timeout for the queue locking */ GTimeVal *timeval; /* the timeout for the queue locking */
GQueue *events; /* upstream events get decoupled here */ GQueue *events; /* upstream events get decoupled here */
GMutex *event_lock; /* lock when handling the events queue */
GstCaps *negotiated_caps; GstCaps *negotiated_caps;

View file

@ -288,6 +288,7 @@ gst_queue_init (GstQueue * queue)
queue->item_del = g_cond_new (); queue->item_del = g_cond_new ();
queue->event_done = g_cond_new (); queue->event_done = g_cond_new ();
queue->events = g_queue_new (); queue->events = g_queue_new ();
queue->event_lock = g_mutex_new ();
queue->queue = g_queue_new (); queue->queue = g_queue_new ();
GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue, GST_CAT_DEBUG_OBJECT (GST_CAT_THREAD, queue,
@ -311,11 +312,14 @@ gst_queue_dispose (GObject * object)
g_cond_free (queue->item_add); g_cond_free (queue->item_add);
g_cond_free (queue->item_del); g_cond_free (queue->item_del);
g_cond_free (queue->event_done); g_cond_free (queue->event_done);
g_mutex_lock (queue->event_lock);
while (!g_queue_is_empty (queue->events)) { while (!g_queue_is_empty (queue->events)) {
GstEvent *event = g_queue_pop_head (queue->events); GstQueueEventResponse *er = g_queue_pop_head (queue->events);
gst_event_unref (event); gst_event_unref (er->event);
} }
g_mutex_unlock (queue->event_lock);
g_mutex_free (queue->event_lock);
g_queue_free (queue->events); g_queue_free (queue->events);
if (G_OBJECT_CLASS (parent_class)->dispose) if (G_OBJECT_CLASS (parent_class)->dispose)
@ -390,15 +394,33 @@ static void
gst_queue_handle_pending_events (GstQueue * queue) gst_queue_handle_pending_events (GstQueue * queue)
{ {
/* check for events to send upstream */ /* check for events to send upstream */
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"handling pending events, events queue of size %d",
g_queue_get_length (queue->events));
g_mutex_lock (queue->event_lock);
while (!g_queue_is_empty (queue->events)) { while (!g_queue_is_empty (queue->events)) {
GstQueueEventResponse *er = g_queue_pop_head (queue->events); GstQueueEventResponse *er;
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "sending event upstream"); er = g_queue_pop_head (queue->events);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"sending event %p (%d) from event response %p upstream",
er->event, GST_EVENT_TYPE (er->event), er);
if (er->handled) {
/* change this to an assert when this file gets reviewed properly. */
GST_ELEMENT_ERROR (queue, CORE, EVENT, (NULL),
("already handled event %p (%d) from event response %p upstream",
er->event, GST_EVENT_TYPE (er->event), er));
break;
}
g_mutex_unlock (queue->event_lock);
er->ret = gst_pad_event_default (queue->srcpad, er->event); er->ret = gst_pad_event_default (queue->srcpad, er->event);
er->handled = TRUE; er->handled = TRUE;
g_cond_signal (queue->event_done); g_cond_signal (queue->event_done);
g_mutex_lock (queue->event_lock);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent"); GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
} }
g_mutex_unlock (queue->event_lock);
} }
#define STATUS(queue, msg) \ #define STATUS(queue, msg) \
@ -770,6 +792,8 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
gboolean res; gboolean res;
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
event, GST_EVENT_TYPE (event));
g_mutex_lock (queue->qlock); g_mutex_lock (queue->qlock);
if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
@ -778,7 +802,12 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
/* push the event to the queue and wait for upstream consumption */ /* push the event to the queue and wait for upstream consumption */
er.event = event; er.event = event;
er.handled = FALSE; er.handled = FALSE;
g_mutex_lock (queue->event_lock);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"putting event %p (%d) on internal queue", event,
GST_EVENT_TYPE (event));
g_queue_push_tail (queue->events, &er); g_queue_push_tail (queue->events, &er);
g_mutex_unlock (queue->event_lock);
GST_CAT_WARNING_OBJECT (queue_dataflow, queue, GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
"Preparing for loop for event handler"); "Preparing for loop for event handler");
/* see the chain function on why this is here - it prevents a deadlock */ /* see the chain function on why this is here - it prevents a deadlock */
@ -791,13 +820,17 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) && if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
!er.handled) { !er.handled) {
GST_CAT_WARNING_OBJECT (queue_dataflow, queue, GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
"timeout in upstream event handling"); "timeout in upstream event handling, dropping event %p (%s)",
/* remove ourselves from the pending list. Since we're er.event, GST_EVENT_TYPE (er.event));
* locked, others cannot reference this anymore. */ g_mutex_lock (queue->event_lock);
queue->queue->head = g_list_remove (queue->queue->head, &er); /* since this queue is for src events (ie upstream), this thread is
queue->queue->head = g_list_first (queue->queue->head); * the only one that is pushing stuff on it, so we're sure that
queue->queue->tail = g_list_last (queue->queue->head); * it's still the tail element. FIXME: But in practice, we should use
queue->queue->length--; * GList instead of GQueue for this so we can remove any element in
* the list. */
g_queue_pop_tail (queue->events);
g_mutex_unlock (queue->event_lock);
gst_event_unref (er.event);
res = FALSE; res = FALSE;
goto handled; goto handled;
} }

View file

@ -91,6 +91,7 @@ struct _GstQueue {
GTimeVal *timeval; /* the timeout for the queue locking */ GTimeVal *timeval; /* the timeout for the queue locking */
GQueue *events; /* upstream events get decoupled here */ GQueue *events; /* upstream events get decoupled here */
GMutex *event_lock; /* lock when handling the events queue */
GstCaps *negotiated_caps; GstCaps *negotiated_caps;