gst/: Fix state changes for non sinks. We now change sinks, then elements with unconnected srcpads, then the rest.

Original commit message from CVS:
* gst/gstbin.c: (bin_element_is_sink), (has_ancestor),
(bin_element_is_semi_sink), (append_child), (gst_bin_change_state):
* gst/gstpad.c: (gst_pad_set_active), (gst_pad_link_prepare),
(gst_pad_link), (gst_pad_accept_caps), (gst_pad_query),
(gst_pad_send_event), (gst_pad_start_task):
* gst/gstqueue.c: (gst_queue_init), (gst_queue_locked_flush),
(gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_loop),
(gst_queue_sink_activate), (gst_queue_src_activate),
(gst_queue_change_state):
* gst/gstqueue.h:
Fix state changes for non sinks. We now change sinks, then elements
with unconnected srcpads, then the rest.
More efficient queue unlocking in flush and state changes.
Set the pad activate mode even if it does not have an activate
function.
This commit is contained in:
Wim Taymans 2005-05-25 19:33:39 +00:00
parent c6d2acd587
commit 56d9730d20
7 changed files with 291 additions and 76 deletions

View file

@ -1,3 +1,21 @@
2005-05-25 Wim Taymans <wim@fluendo.com>
* gst/gstbin.c: (bin_element_is_sink), (has_ancestor),
(bin_element_is_semi_sink), (append_child), (gst_bin_change_state):
* gst/gstpad.c: (gst_pad_set_active), (gst_pad_link_prepare),
(gst_pad_link), (gst_pad_accept_caps), (gst_pad_query),
(gst_pad_send_event), (gst_pad_start_task):
* gst/gstqueue.c: (gst_queue_init), (gst_queue_locked_flush),
(gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_loop),
(gst_queue_sink_activate), (gst_queue_src_activate),
(gst_queue_change_state):
* gst/gstqueue.h:
Fix state changes for non sinks. We now change sinks, then elements
with unconnected srcpads, then the rest.
More efficient queue unlocking in flush and state changes.
Set the pad activate mode even if it does not have an activate
function.
2005-05-25 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
* gst/base/gstbasesrc.c: (gst_basesrc_activate):

View file

@ -686,26 +686,98 @@ gst_bin_iterate_recurse (GstBin * bin)
return result;
}
/* returns 0 when TRUE because this is a GCompareFunc */
/* MT safe */
static gint
bin_element_is_sink (GstElement * child, GstBin * bin)
{
gboolean is_sink;
/* we lock the child here for the remainder of the function to
* get its name safely. */
GST_LOCK (child);
if (GST_FLAG_IS_SET (child, GST_ELEMENT_IS_SINK)) {
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
"finding child %s as sink", GST_OBJECT_NAME (child));
is_sink = GST_FLAG_IS_SET (child, GST_ELEMENT_IS_SINK);
GST_UNLOCK (child);
/* returns 0 because this is a GCompareFunc */
return 0;
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
"child %s %s sink", GST_OBJECT_NAME (child), is_sink ? "is" : "is not");
return is_sink ? 0 : 1;
}
static gboolean
has_ancestor (GstObject * object, GstObject * ancestor)
{
GstObject *parent;
gboolean result = FALSE;
if (object == NULL)
return FALSE;
if (object == ancestor)
return TRUE;
parent = gst_object_get_parent (object);
result = has_ancestor (parent, ancestor);
if (parent)
gst_object_unref (GST_OBJECT_CAST (parent));
return result;
}
/* returns 0 when TRUE because this is a GCompareFunc.
* This function returns elements that have no connected srcpads and
* are therefore not reachable from a real sink. */
/* MT safe */
static gint
bin_element_is_semi_sink (GstElement * child, GstBin * bin)
{
int ret = 1;
/* we lock the child here for the remainder of the function to
* get its pads and name safely. */
GST_LOCK (child);
/* check if this is a sink element, these are the elements
* without (linked) source pads. */
if (child->numsrcpads == 0) {
/* shortcut */
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
"adding child %s as sink", GST_OBJECT_NAME (child));
ret = 0;
} else {
/* loop over all pads, try to figure out if this element
* is a semi sink because it has no linked source pads */
GList *pads;
gboolean connected_src = FALSE;
for (pads = child->srcpads; pads; pads = g_list_next (pads)) {
GstPad *peer;
if ((peer = gst_pad_get_peer (GST_PAD_CAST (pads->data)))) {
connected_src =
has_ancestor (GST_OBJECT_CAST (peer), GST_OBJECT_CAST (bin));
gst_object_unref (GST_OBJECT_CAST (peer));
if (connected_src) {
break;
}
}
}
if (connected_src) {
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
"not adding child %s as sink: linked source pads",
GST_OBJECT_NAME (child));
} else {
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
"child %s is not a sink", GST_OBJECT_NAME (child));
GST_UNLOCK (child);
return 1;
"adding child %s as sink since it has unlinked source pads in this bin",
GST_OBJECT_NAME (child));
ret = 0;
}
}
GST_UNLOCK (child);
return ret;
}
static gint
sink_iterator_filter (GstElement * child, GstBin * bin)
@ -828,6 +900,12 @@ restart:
return ret;
}
static void
append_child (gpointer child, GQueue * queue)
{
g_queue_push_tail (queue, child);
}
/* this function is called with the STATE_LOCK held. It works
* as follows:
*
@ -852,7 +930,8 @@ gst_bin_change_state (GstElement * element)
GList *children;
guint32 children_cookie;
GQueue *elem_queue; /* list of elements waiting for a state change */
GQueue *temp; /* temp queue of non sinks */
GQueue *semi_queue; /* list of elements with no connected srcpads */
GQueue *temp; /* temp queue of leftovers */
bin = GST_BIN (element);
@ -871,6 +950,7 @@ gst_bin_change_state (GstElement * element)
/* all elements added to this queue should have their refcount
* incremented */
elem_queue = g_queue_new ();
semi_queue = g_queue_new ();
temp = g_queue_new ();
/* first step, find all sink elements, these are the elements
@ -887,6 +967,8 @@ restart:
if (bin_element_is_sink (child, bin) == 0) {
g_queue_push_tail (elem_queue, child);
} else if (bin_element_is_semi_sink (child, bin) == 0) {
g_queue_push_tail (semi_queue, child);
} else {
g_queue_push_tail (temp, child);
}
@ -895,8 +977,10 @@ restart:
if (G_UNLIKELY (children_cookie != bin->children_cookie)) {
/* undo what we had */
g_queue_foreach (elem_queue, (GFunc) gst_object_unref, NULL);
g_queue_foreach (semi_queue, (GFunc) gst_object_unref, NULL);
g_queue_foreach (temp, (GFunc) gst_object_unref, NULL);
while (g_queue_pop_head (elem_queue));
while (g_queue_pop_head (semi_queue));
while (g_queue_pop_head (temp));
goto restart;
}
@ -905,12 +989,17 @@ restart:
}
GST_UNLOCK (bin);
/* now change state for semi sink elements first so add them in
* front of the other elements */
g_queue_foreach (temp, (GFunc) append_child, semi_queue);
g_queue_free (temp);
/* can be the case for a bin like ( identity ) */
if (g_queue_is_empty (elem_queue) && !g_queue_is_empty (temp)) {
if (g_queue_is_empty (elem_queue) && !g_queue_is_empty (semi_queue)) {
GQueue *q = elem_queue;
elem_queue = temp;
temp = q;
elem_queue = semi_queue;
semi_queue = q;
}
/* second step, change state of elements in the queue */
@ -921,15 +1010,15 @@ restart:
/* take element */
qelement = g_queue_pop_head (elem_queue);
/* we don't need it in the temp anymore */
g_queue_remove_all (temp, qelement);
/* we don't need it in the semi_queue anymore */
g_queue_remove_all (semi_queue, qelement);
/* if queue is empty now, continue with a non-sink */
if (g_queue_is_empty (elem_queue)) {
GstElement *non_sink;
GST_DEBUG ("sinks and upstream elements exhausted");
non_sink = g_queue_pop_head (temp);
non_sink = g_queue_pop_head (semi_queue);
if (non_sink) {
GST_DEBUG ("found lefover non-sink %s", GST_OBJECT_NAME (non_sink));
g_queue_push_tail (elem_queue, non_sink);
@ -1044,8 +1133,8 @@ exit:
/* release refcounts in queue, should normally be empty */
g_queue_foreach (elem_queue, (GFunc) gst_object_unref, NULL);
g_queue_free (elem_queue);
g_queue_foreach (temp, (GFunc) gst_object_unref, NULL);
g_queue_free (temp);
g_queue_foreach (semi_queue, (GFunc) gst_object_unref, NULL);
g_queue_free (semi_queue);
return ret;
}

View file

@ -506,10 +506,9 @@ gst_pad_set_active (GstPad * pad, GstActivateMode mode)
GST_LOCK (realpad);
if (result == FALSE)
goto activate_error;
}
/* store the mode */
GST_RPAD_ACTIVATE_MODE (realpad) = mode;
}
/* when going to active allow data passing now */
if (active) {
@ -529,7 +528,8 @@ gst_pad_set_active (GstPad * pad, GstActivateMode mode)
was_ok:
{
GST_CAT_DEBUG (GST_CAT_PADS,
"pad %s:%s was active", GST_DEBUG_PAD_NAME (realpad));
"pad %s:%s was active, old %d, new %d",
GST_DEBUG_PAD_NAME (realpad), old, mode);
GST_UNLOCK (realpad);
return TRUE;
}

View file

@ -141,6 +141,7 @@ static GstPadLinkReturn gst_queue_link_src (GstPad * pad, GstPad * peer);
static void gst_queue_locked_flush (GstQueue * queue);
static gboolean gst_queue_src_activate (GstPad * pad, GstActivateMode mode);
static gboolean gst_queue_sink_activate (GstPad * pad, GstActivateMode mode);
static GstElementStateReturn gst_queue_change_state (GstElement * element);
@ -299,6 +300,8 @@ gst_queue_init (GstQueue * queue)
"sink");
gst_pad_set_chain_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_chain));
gst_pad_set_activate_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_sink_activate));
gst_pad_set_event_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event));
gst_pad_set_link_function (queue->sinkpad,
@ -338,7 +341,7 @@ gst_queue_init (GstQueue * queue)
queue->leaky = GST_QUEUE_NO_LEAK;
queue->may_deadlock = TRUE;
queue->block_timeout = GST_CLOCK_TIME_NONE;
queue->flush = FALSE;
queue->flushing = FALSE;
queue->qlock = g_mutex_new ();
queue->item_add = g_cond_new ();
@ -446,9 +449,6 @@ gst_queue_locked_flush (GstQueue * queue)
queue->cur_level.bytes = 0;
queue->cur_level.time = 0;
/* make sure any pending buffers to be added are flushed too */
queue->flush = TRUE;
/* we deleted something... */
g_cond_signal (queue->item_del);
}
@ -483,19 +483,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
/* forward event */
gst_pad_event_default (pad, event);
if (GST_EVENT_FLUSH_DONE (event)) {
GST_QUEUE_MUTEX_LOCK;
queue->flushing = FALSE;
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
queue->srcpad);
GST_QUEUE_MUTEX_UNLOCK;
} else {
/* now unblock the chain function */
GST_QUEUE_MUTEX_LOCK;
queue->flushing = TRUE;
gst_queue_locked_flush (queue);
/* unblock the loop function */
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
STATUS (queue, "after flush");
/* unblock the loop function */
g_cond_signal (queue->item_add);
/* make sure it stops */
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
@ -628,7 +631,10 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
STATUS (queue, "waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock);
if (queue->flushing)
#if 0
if (GST_RPAD_IS_FLUSHING (pad))
#endif
goto out_flushing;
/* if there's a pending state change for this queue
@ -644,9 +650,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
break;
}
}
#if 0
/* we are flushing */
if (GST_RPAD_IS_FLUSHING (pad))
goto out_flushing;
#endif
g_queue_push_tail (queue->queue, buffer);
@ -703,17 +711,30 @@ restart:
while (gst_queue_is_empty (queue)) {
STATUS (queue, "waiting for item_add");
#if 0
/* we are flushing */
if (GST_RPAD_IS_FLUSHING (pad))
goto out_flushing;
if (GST_RPAD_IS_FLUSHING (queue->sinkpad))
goto out_flushing;
#endif
if (queue->flushing)
goto out_flushing;
GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
g_thread_self ());
g_cond_wait (queue->item_add, queue->qlock);
if (queue->flushing)
goto out_flushing;
#if 0
/* we got unlocked because we are flushing */
if (GST_RPAD_IS_FLUSHING (pad))
goto out_flushing;
if (GST_RPAD_IS_FLUSHING (queue->sinkpad))
goto out_flushing;
#endif
GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
g_thread_self ());
@ -842,6 +863,37 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
return TRUE;
}
static gboolean
gst_queue_sink_activate (GstPad * pad, GstActivateMode mode)
{
gboolean result = FALSE;
GstQueue *queue;
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
switch (mode) {
case GST_ACTIVATE_PUSH:
queue->flushing = FALSE;
result = TRUE;
break;
case GST_ACTIVATE_PULL:
result = FALSE;
break;
case GST_ACTIVATE_NONE:
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
queue->flushing = TRUE;
gst_queue_locked_flush (queue);
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
break;
}
return result;
}
static gboolean
gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
{
@ -850,22 +902,30 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
if (mode == GST_ACTIVATE_PUSH) {
switch (mode) {
case GST_ACTIVATE_PUSH:
GST_QUEUE_MUTEX_LOCK;
queue->flushing = FALSE;
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
} else {
GST_QUEUE_MUTEX_UNLOCK;
break;
case GST_ACTIVATE_PULL:
result = FALSE;
break;
case GST_ACTIVATE_NONE:
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
queue->flushing = TRUE;
g_cond_signal (queue->item_add);
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
break;
}
return result;
}
static GstElementStateReturn
gst_queue_change_state (GstElement * element)
{
@ -883,9 +943,6 @@ gst_queue_change_state (GstElement * element)
case GST_STATE_NULL_TO_READY:
break;
case GST_STATE_READY_TO_PAUSED:
GST_QUEUE_MUTEX_LOCK;
gst_queue_locked_flush (queue);
GST_QUEUE_MUTEX_UNLOCK;
break;
case GST_STATE_PAUSED_TO_PLAYING:
break;
@ -899,9 +956,6 @@ gst_queue_change_state (GstElement * element)
case GST_STATE_PLAYING_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_READY:
GST_QUEUE_MUTEX_LOCK;
gst_queue_locked_flush (queue);
GST_QUEUE_MUTEX_UNLOCK;
break;
case GST_STATE_READY_TO_NULL:
break;

View file

@ -79,7 +79,7 @@ struct _GstQueue {
/* it the queue should fail on possible deadlocks */
gboolean may_deadlock;
gboolean flush;
gboolean flushing;
GMutex *qlock; /* lock for queue (vs object lock) */
GCond *item_add; /* signals buffers now available for reading */

View file

@ -141,6 +141,7 @@ static GstPadLinkReturn gst_queue_link_src (GstPad * pad, GstPad * peer);
static void gst_queue_locked_flush (GstQueue * queue);
static gboolean gst_queue_src_activate (GstPad * pad, GstActivateMode mode);
static gboolean gst_queue_sink_activate (GstPad * pad, GstActivateMode mode);
static GstElementStateReturn gst_queue_change_state (GstElement * element);
@ -299,6 +300,8 @@ gst_queue_init (GstQueue * queue)
"sink");
gst_pad_set_chain_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_chain));
gst_pad_set_activate_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_sink_activate));
gst_pad_set_event_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event));
gst_pad_set_link_function (queue->sinkpad,
@ -338,7 +341,7 @@ gst_queue_init (GstQueue * queue)
queue->leaky = GST_QUEUE_NO_LEAK;
queue->may_deadlock = TRUE;
queue->block_timeout = GST_CLOCK_TIME_NONE;
queue->flush = FALSE;
queue->flushing = FALSE;
queue->qlock = g_mutex_new ();
queue->item_add = g_cond_new ();
@ -446,9 +449,6 @@ gst_queue_locked_flush (GstQueue * queue)
queue->cur_level.bytes = 0;
queue->cur_level.time = 0;
/* make sure any pending buffers to be added are flushed too */
queue->flush = TRUE;
/* we deleted something... */
g_cond_signal (queue->item_del);
}
@ -483,19 +483,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
/* forward event */
gst_pad_event_default (pad, event);
if (GST_EVENT_FLUSH_DONE (event)) {
GST_QUEUE_MUTEX_LOCK;
queue->flushing = FALSE;
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
queue->srcpad);
GST_QUEUE_MUTEX_UNLOCK;
} else {
/* now unblock the chain function */
GST_QUEUE_MUTEX_LOCK;
queue->flushing = TRUE;
gst_queue_locked_flush (queue);
/* unblock the loop function */
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
STATUS (queue, "after flush");
/* unblock the loop function */
g_cond_signal (queue->item_add);
/* make sure it stops */
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
@ -628,7 +631,10 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
STATUS (queue, "waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock);
if (queue->flushing)
#if 0
if (GST_RPAD_IS_FLUSHING (pad))
#endif
goto out_flushing;
/* if there's a pending state change for this queue
@ -644,9 +650,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
break;
}
}
#if 0
/* we are flushing */
if (GST_RPAD_IS_FLUSHING (pad))
goto out_flushing;
#endif
g_queue_push_tail (queue->queue, buffer);
@ -703,17 +711,30 @@ restart:
while (gst_queue_is_empty (queue)) {
STATUS (queue, "waiting for item_add");
#if 0
/* we are flushing */
if (GST_RPAD_IS_FLUSHING (pad))
goto out_flushing;
if (GST_RPAD_IS_FLUSHING (queue->sinkpad))
goto out_flushing;
#endif
if (queue->flushing)
goto out_flushing;
GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
g_thread_self ());
g_cond_wait (queue->item_add, queue->qlock);
if (queue->flushing)
goto out_flushing;
#if 0
/* we got unlocked because we are flushing */
if (GST_RPAD_IS_FLUSHING (pad))
goto out_flushing;
if (GST_RPAD_IS_FLUSHING (queue->sinkpad))
goto out_flushing;
#endif
GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
g_thread_self ());
@ -842,6 +863,37 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
return TRUE;
}
static gboolean
gst_queue_sink_activate (GstPad * pad, GstActivateMode mode)
{
gboolean result = FALSE;
GstQueue *queue;
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
switch (mode) {
case GST_ACTIVATE_PUSH:
queue->flushing = FALSE;
result = TRUE;
break;
case GST_ACTIVATE_PULL:
result = FALSE;
break;
case GST_ACTIVATE_NONE:
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
queue->flushing = TRUE;
gst_queue_locked_flush (queue);
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
break;
}
return result;
}
static gboolean
gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
{
@ -850,22 +902,30 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
if (mode == GST_ACTIVATE_PUSH) {
switch (mode) {
case GST_ACTIVATE_PUSH:
GST_QUEUE_MUTEX_LOCK;
queue->flushing = FALSE;
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
} else {
GST_QUEUE_MUTEX_UNLOCK;
break;
case GST_ACTIVATE_PULL:
result = FALSE;
break;
case GST_ACTIVATE_NONE:
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
queue->flushing = TRUE;
g_cond_signal (queue->item_add);
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
break;
}
return result;
}
static GstElementStateReturn
gst_queue_change_state (GstElement * element)
{
@ -883,9 +943,6 @@ gst_queue_change_state (GstElement * element)
case GST_STATE_NULL_TO_READY:
break;
case GST_STATE_READY_TO_PAUSED:
GST_QUEUE_MUTEX_LOCK;
gst_queue_locked_flush (queue);
GST_QUEUE_MUTEX_UNLOCK;
break;
case GST_STATE_PAUSED_TO_PLAYING:
break;
@ -899,9 +956,6 @@ gst_queue_change_state (GstElement * element)
case GST_STATE_PLAYING_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_READY:
GST_QUEUE_MUTEX_LOCK;
gst_queue_locked_flush (queue);
GST_QUEUE_MUTEX_UNLOCK;
break;
case GST_STATE_READY_TO_NULL:
break;

View file

@ -79,7 +79,7 @@ struct _GstQueue {
/* it the queue should fail on possible deadlocks */
gboolean may_deadlock;
gboolean flush;
gboolean flushing;
GMutex *qlock; /* lock for queue (vs object lock) */
GCond *item_add; /* signals buffers now available for reading */