mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-09-03 18:53:58 +00:00
cleanup: remove writer/reader booleans, just signal everytime bugfix: signal not_full after flush (Martin Janzen)
Original commit message from CVS: cleanup: remove writer/reader booleans, just signal everytime bugfix: signal not_full after flush (Martin Janzen)
This commit is contained in:
parent
729bbc9c0a
commit
fec6a61298
4 changed files with 20 additions and 70 deletions
|
@ -239,8 +239,6 @@ gst_queue_init (GstQueue *queue)
|
||||||
queue->flush = FALSE;
|
queue->flush = FALSE;
|
||||||
|
|
||||||
queue->qlock = g_mutex_new ();
|
queue->qlock = g_mutex_new ();
|
||||||
queue->reader = FALSE;
|
|
||||||
queue->writer = FALSE;
|
|
||||||
queue->not_empty = g_cond_new ();
|
queue->not_empty = g_cond_new ();
|
||||||
queue->not_full = g_cond_new ();
|
queue->not_full = g_cond_new ();
|
||||||
queue->events = g_async_queue_new();
|
queue->events = g_async_queue_new();
|
||||||
|
@ -253,10 +251,11 @@ gst_queue_dispose (GObject *object)
|
||||||
{
|
{
|
||||||
GstQueue *queue = GST_QUEUE (object);
|
GstQueue *queue = GST_QUEUE (object);
|
||||||
|
|
||||||
|
gst_queue_locked_flush (queue);
|
||||||
|
|
||||||
g_mutex_free (queue->qlock);
|
g_mutex_free (queue->qlock);
|
||||||
g_cond_free (queue->not_empty);
|
g_cond_free (queue->not_empty);
|
||||||
g_cond_free (queue->not_full);
|
g_cond_free (queue->not_full);
|
||||||
gst_queue_locked_flush (queue);
|
|
||||||
g_queue_free (queue->queue);
|
g_queue_free (queue->queue);
|
||||||
|
|
||||||
g_async_queue_unref(queue->events);
|
g_async_queue_unref(queue->events);
|
||||||
|
@ -296,13 +295,14 @@ gst_queue_locked_flush (GstQueue *queue)
|
||||||
queue->level_time = G_GINT64_CONSTANT (0);
|
queue->level_time = G_GINT64_CONSTANT (0);
|
||||||
/* make sure any pending buffers to be added are flushed too */
|
/* make sure any pending buffers to be added are flushed too */
|
||||||
queue->flush = TRUE;
|
queue->flush = TRUE;
|
||||||
|
/* signal not_full, since we apparently aren't full anymore */
|
||||||
|
g_cond_signal (queue->not_full);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
gst_queue_chain (GstPad *pad, GstBuffer *buf)
|
gst_queue_chain (GstPad *pad, GstBuffer *buf)
|
||||||
{
|
{
|
||||||
GstQueue *queue;
|
GstQueue *queue;
|
||||||
gboolean reader;
|
|
||||||
|
|
||||||
g_return_if_fail (pad != NULL);
|
g_return_if_fail (pad != NULL);
|
||||||
g_return_if_fail (GST_IS_PAD (pad));
|
g_return_if_fail (GST_IS_PAD (pad));
|
||||||
|
@ -421,11 +421,7 @@ restart:
|
||||||
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d",
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d",
|
||||||
queue->level_buffers, queue->size_buffers);
|
queue->level_buffers, queue->size_buffers);
|
||||||
if (queue->writer)
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!");
|
|
||||||
queue->writer = TRUE;
|
|
||||||
g_cond_wait (queue->not_full, queue->qlock);
|
g_cond_wait (queue->not_full, queue->qlock);
|
||||||
queue->writer = FALSE;
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal");
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal");
|
||||||
}
|
}
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d",
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d",
|
||||||
|
@ -445,16 +441,10 @@ restart:
|
||||||
GST_DEBUG_PAD_NAME(pad),
|
GST_DEBUG_PAD_NAME(pad),
|
||||||
queue->level_buffers, queue->size_buffers);
|
queue->level_buffers, queue->size_buffers);
|
||||||
|
|
||||||
/* reader waiting on an empty queue */
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty");
|
||||||
reader = queue->reader;
|
g_cond_signal (queue->not_empty);
|
||||||
|
|
||||||
g_mutex_unlock (queue->qlock);
|
g_mutex_unlock (queue->qlock);
|
||||||
|
|
||||||
if (reader)
|
|
||||||
{
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty");
|
|
||||||
g_cond_signal (queue->not_empty);
|
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
out_unref:
|
out_unref:
|
||||||
|
@ -468,7 +458,6 @@ gst_queue_get (GstPad *pad)
|
||||||
GstQueue *queue;
|
GstQueue *queue;
|
||||||
GstBuffer *buf = NULL;
|
GstBuffer *buf = NULL;
|
||||||
gpointer front;
|
gpointer front;
|
||||||
gboolean writer;
|
|
||||||
|
|
||||||
g_assert(pad != NULL);
|
g_assert(pad != NULL);
|
||||||
g_assert(GST_IS_PAD(pad));
|
g_assert(GST_IS_PAD(pad));
|
||||||
|
@ -509,11 +498,8 @@ restart:
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d", queue->level_buffers, queue->size_buffers);
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d", queue->level_buffers, queue->size_buffers);
|
||||||
if (queue->reader)
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!");
|
|
||||||
queue->reader = TRUE;
|
|
||||||
|
|
||||||
//if (queue->block_timeout > -1){
|
/* if (queue->block_timeout > -1){ */
|
||||||
if (FALSE) {
|
if (FALSE) {
|
||||||
GTimeVal timeout;
|
GTimeVal timeout;
|
||||||
g_get_current_time(&timeout);
|
g_get_current_time(&timeout);
|
||||||
|
@ -527,7 +513,6 @@ restart:
|
||||||
else {
|
else {
|
||||||
g_cond_wait (queue->not_empty, queue->qlock);
|
g_cond_wait (queue->not_empty, queue->qlock);
|
||||||
}
|
}
|
||||||
queue->reader = FALSE;
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal");
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal");
|
||||||
}
|
}
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
|
||||||
|
@ -546,17 +531,11 @@ restart:
|
||||||
/* this assertion _has_ to hold */
|
/* this assertion _has_ to hold */
|
||||||
g_assert (queue->queue->length == queue->level_buffers);
|
g_assert (queue->queue->length == queue->level_buffers);
|
||||||
|
|
||||||
/* writer waiting on a full queue */
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full");
|
||||||
writer = queue->writer;
|
g_cond_signal (queue->not_full);
|
||||||
|
|
||||||
g_mutex_unlock (queue->qlock);
|
g_mutex_unlock (queue->qlock);
|
||||||
|
|
||||||
if (writer)
|
|
||||||
{
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full");
|
|
||||||
g_cond_signal (queue->not_full);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* FIXME where should this be? locked? */
|
/* FIXME where should this be? locked? */
|
||||||
if (GST_IS_EVENT(buf)) {
|
if (GST_IS_EVENT(buf)) {
|
||||||
GstEvent *event = GST_EVENT(buf);
|
GstEvent *event = GST_EVENT(buf);
|
||||||
|
@ -663,8 +642,7 @@ gst_queue_change_state (GstElement *element)
|
||||||
if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
|
if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_STATES, queue, "queue %s is not linked", GST_ELEMENT_NAME (queue));
|
GST_DEBUG_ELEMENT (GST_CAT_STATES, queue, "queue %s is not linked", GST_ELEMENT_NAME (queue));
|
||||||
/* FIXME can this be? */
|
/* FIXME can this be? */
|
||||||
if (queue->reader)
|
g_cond_signal (queue->not_empty);
|
||||||
g_cond_signal (queue->not_empty);
|
|
||||||
|
|
||||||
ret = GST_STATE_FAILURE;
|
ret = GST_STATE_FAILURE;
|
||||||
goto error;
|
goto error;
|
||||||
|
|
|
@ -78,9 +78,6 @@ struct _GstQueue {
|
||||||
gboolean flush;
|
gboolean flush;
|
||||||
|
|
||||||
GMutex *qlock; /* lock for queue (vs object lock) */
|
GMutex *qlock; /* lock for queue (vs object lock) */
|
||||||
/* we are single reader and single writer queue */
|
|
||||||
gboolean reader; /* reader waiting on empty queue */
|
|
||||||
gboolean writer; /* writer waiting on full queue */
|
|
||||||
GCond *not_empty; /* signals buffers now available for reading */
|
GCond *not_empty; /* signals buffers now available for reading */
|
||||||
GCond *not_full; /* signals space now available for writing */
|
GCond *not_full; /* signals space now available for writing */
|
||||||
|
|
||||||
|
|
|
@ -239,8 +239,6 @@ gst_queue_init (GstQueue *queue)
|
||||||
queue->flush = FALSE;
|
queue->flush = FALSE;
|
||||||
|
|
||||||
queue->qlock = g_mutex_new ();
|
queue->qlock = g_mutex_new ();
|
||||||
queue->reader = FALSE;
|
|
||||||
queue->writer = FALSE;
|
|
||||||
queue->not_empty = g_cond_new ();
|
queue->not_empty = g_cond_new ();
|
||||||
queue->not_full = g_cond_new ();
|
queue->not_full = g_cond_new ();
|
||||||
queue->events = g_async_queue_new();
|
queue->events = g_async_queue_new();
|
||||||
|
@ -253,10 +251,11 @@ gst_queue_dispose (GObject *object)
|
||||||
{
|
{
|
||||||
GstQueue *queue = GST_QUEUE (object);
|
GstQueue *queue = GST_QUEUE (object);
|
||||||
|
|
||||||
|
gst_queue_locked_flush (queue);
|
||||||
|
|
||||||
g_mutex_free (queue->qlock);
|
g_mutex_free (queue->qlock);
|
||||||
g_cond_free (queue->not_empty);
|
g_cond_free (queue->not_empty);
|
||||||
g_cond_free (queue->not_full);
|
g_cond_free (queue->not_full);
|
||||||
gst_queue_locked_flush (queue);
|
|
||||||
g_queue_free (queue->queue);
|
g_queue_free (queue->queue);
|
||||||
|
|
||||||
g_async_queue_unref(queue->events);
|
g_async_queue_unref(queue->events);
|
||||||
|
@ -296,13 +295,14 @@ gst_queue_locked_flush (GstQueue *queue)
|
||||||
queue->level_time = G_GINT64_CONSTANT (0);
|
queue->level_time = G_GINT64_CONSTANT (0);
|
||||||
/* make sure any pending buffers to be added are flushed too */
|
/* make sure any pending buffers to be added are flushed too */
|
||||||
queue->flush = TRUE;
|
queue->flush = TRUE;
|
||||||
|
/* signal not_full, since we apparently aren't full anymore */
|
||||||
|
g_cond_signal (queue->not_full);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
gst_queue_chain (GstPad *pad, GstBuffer *buf)
|
gst_queue_chain (GstPad *pad, GstBuffer *buf)
|
||||||
{
|
{
|
||||||
GstQueue *queue;
|
GstQueue *queue;
|
||||||
gboolean reader;
|
|
||||||
|
|
||||||
g_return_if_fail (pad != NULL);
|
g_return_if_fail (pad != NULL);
|
||||||
g_return_if_fail (GST_IS_PAD (pad));
|
g_return_if_fail (GST_IS_PAD (pad));
|
||||||
|
@ -421,11 +421,7 @@ restart:
|
||||||
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d",
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d",
|
||||||
queue->level_buffers, queue->size_buffers);
|
queue->level_buffers, queue->size_buffers);
|
||||||
if (queue->writer)
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!");
|
|
||||||
queue->writer = TRUE;
|
|
||||||
g_cond_wait (queue->not_full, queue->qlock);
|
g_cond_wait (queue->not_full, queue->qlock);
|
||||||
queue->writer = FALSE;
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal");
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal");
|
||||||
}
|
}
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d",
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d",
|
||||||
|
@ -445,16 +441,10 @@ restart:
|
||||||
GST_DEBUG_PAD_NAME(pad),
|
GST_DEBUG_PAD_NAME(pad),
|
||||||
queue->level_buffers, queue->size_buffers);
|
queue->level_buffers, queue->size_buffers);
|
||||||
|
|
||||||
/* reader waiting on an empty queue */
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty");
|
||||||
reader = queue->reader;
|
g_cond_signal (queue->not_empty);
|
||||||
|
|
||||||
g_mutex_unlock (queue->qlock);
|
g_mutex_unlock (queue->qlock);
|
||||||
|
|
||||||
if (reader)
|
|
||||||
{
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty");
|
|
||||||
g_cond_signal (queue->not_empty);
|
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
out_unref:
|
out_unref:
|
||||||
|
@ -468,7 +458,6 @@ gst_queue_get (GstPad *pad)
|
||||||
GstQueue *queue;
|
GstQueue *queue;
|
||||||
GstBuffer *buf = NULL;
|
GstBuffer *buf = NULL;
|
||||||
gpointer front;
|
gpointer front;
|
||||||
gboolean writer;
|
|
||||||
|
|
||||||
g_assert(pad != NULL);
|
g_assert(pad != NULL);
|
||||||
g_assert(GST_IS_PAD(pad));
|
g_assert(GST_IS_PAD(pad));
|
||||||
|
@ -509,11 +498,8 @@ restart:
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d", queue->level_buffers, queue->size_buffers);
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d", queue->level_buffers, queue->size_buffers);
|
||||||
if (queue->reader)
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!");
|
|
||||||
queue->reader = TRUE;
|
|
||||||
|
|
||||||
//if (queue->block_timeout > -1){
|
/* if (queue->block_timeout > -1){ */
|
||||||
if (FALSE) {
|
if (FALSE) {
|
||||||
GTimeVal timeout;
|
GTimeVal timeout;
|
||||||
g_get_current_time(&timeout);
|
g_get_current_time(&timeout);
|
||||||
|
@ -527,7 +513,6 @@ restart:
|
||||||
else {
|
else {
|
||||||
g_cond_wait (queue->not_empty, queue->qlock);
|
g_cond_wait (queue->not_empty, queue->qlock);
|
||||||
}
|
}
|
||||||
queue->reader = FALSE;
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal");
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal");
|
||||||
}
|
}
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
|
||||||
|
@ -546,17 +531,11 @@ restart:
|
||||||
/* this assertion _has_ to hold */
|
/* this assertion _has_ to hold */
|
||||||
g_assert (queue->queue->length == queue->level_buffers);
|
g_assert (queue->queue->length == queue->level_buffers);
|
||||||
|
|
||||||
/* writer waiting on a full queue */
|
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full");
|
||||||
writer = queue->writer;
|
g_cond_signal (queue->not_full);
|
||||||
|
|
||||||
g_mutex_unlock (queue->qlock);
|
g_mutex_unlock (queue->qlock);
|
||||||
|
|
||||||
if (writer)
|
|
||||||
{
|
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full");
|
|
||||||
g_cond_signal (queue->not_full);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* FIXME where should this be? locked? */
|
/* FIXME where should this be? locked? */
|
||||||
if (GST_IS_EVENT(buf)) {
|
if (GST_IS_EVENT(buf)) {
|
||||||
GstEvent *event = GST_EVENT(buf);
|
GstEvent *event = GST_EVENT(buf);
|
||||||
|
@ -663,8 +642,7 @@ gst_queue_change_state (GstElement *element)
|
||||||
if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
|
if (!GST_PAD_IS_LINKED (queue->sinkpad)) {
|
||||||
GST_DEBUG_ELEMENT (GST_CAT_STATES, queue, "queue %s is not linked", GST_ELEMENT_NAME (queue));
|
GST_DEBUG_ELEMENT (GST_CAT_STATES, queue, "queue %s is not linked", GST_ELEMENT_NAME (queue));
|
||||||
/* FIXME can this be? */
|
/* FIXME can this be? */
|
||||||
if (queue->reader)
|
g_cond_signal (queue->not_empty);
|
||||||
g_cond_signal (queue->not_empty);
|
|
||||||
|
|
||||||
ret = GST_STATE_FAILURE;
|
ret = GST_STATE_FAILURE;
|
||||||
goto error;
|
goto error;
|
||||||
|
|
|
@ -78,9 +78,6 @@ struct _GstQueue {
|
||||||
gboolean flush;
|
gboolean flush;
|
||||||
|
|
||||||
GMutex *qlock; /* lock for queue (vs object lock) */
|
GMutex *qlock; /* lock for queue (vs object lock) */
|
||||||
/* we are single reader and single writer queue */
|
|
||||||
gboolean reader; /* reader waiting on empty queue */
|
|
||||||
gboolean writer; /* writer waiting on full queue */
|
|
||||||
GCond *not_empty; /* signals buffers now available for reading */
|
GCond *not_empty; /* signals buffers now available for reading */
|
||||||
GCond *not_full; /* signals space now available for writing */
|
GCond *not_full; /* signals space now available for writing */
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue