mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-10-02 16:52:42 +00:00
plugins/elements/gstmultiqueue.*: revert extra-size-buffers stuff, caused some race conditions and extra-size-buffers...
Original commit message from CVS: * plugins/elements/gstmultiqueue.c: * plugins/elements/gstmultiqueue.h: revert extra-size-buffers stuff, caused some race conditions and extra-size-buffers is not used anymore. Docs needs some updates
This commit is contained in:
parent
f59ff633bc
commit
6d4dc1a807
3 changed files with 79 additions and 79 deletions
|
@ -1,3 +1,10 @@
|
||||||
|
2008-06-26 Thijs Vermeir <thijsvermeir@gmail.com>
|
||||||
|
|
||||||
|
* plugins/elements/gstmultiqueue.c:
|
||||||
|
* plugins/elements/gstmultiqueue.h:
|
||||||
|
revert extra-size-buffers stuff, caused some race conditions
|
||||||
|
and extra-size-buffers is not used anymore. Docs needs some updates
|
||||||
|
|
||||||
2008-06-26 Tim-Philipp Müller <tim.muller at collabora co uk>
|
2008-06-26 Tim-Philipp Müller <tim.muller at collabora co uk>
|
||||||
|
|
||||||
* win32/common/config.h:
|
* win32/common/config.h:
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
* Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
|
* Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
|
||||||
* Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
|
* Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
|
||||||
* Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
|
* Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
|
||||||
* Copyright (C) 2008 Thijs Vermeir <thijsvermeir@gmail.com>
|
|
||||||
*
|
*
|
||||||
* gstmultiqueue.c:
|
* gstmultiqueue.c:
|
||||||
*
|
*
|
||||||
|
@ -219,11 +218,11 @@ enum
|
||||||
};
|
};
|
||||||
|
|
||||||
#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
|
#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
|
||||||
g_mutex_lock (q->qlock); \
|
g_mutex_lock (q->qlock); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
|
#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
|
||||||
g_mutex_unlock (q->qlock); \
|
g_mutex_unlock (q->qlock); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
static void gst_multi_queue_finalize (GObject * object);
|
static void gst_multi_queue_finalize (GObject * object);
|
||||||
|
@ -253,8 +252,7 @@ gst_multi_queue_base_init (gpointer g_class)
|
||||||
|
|
||||||
gst_element_class_set_details_simple (gstelement_class,
|
gst_element_class_set_details_simple (gstelement_class,
|
||||||
"MultiQueue",
|
"MultiQueue",
|
||||||
"Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>\n"
|
"Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
|
||||||
"Thijs Vermeir <thijsvermeir@gmail.com>");
|
|
||||||
gst_element_class_add_pad_template (gstelement_class,
|
gst_element_class_add_pad_template (gstelement_class,
|
||||||
gst_static_pad_template_get (&sinktemplate));
|
gst_static_pad_template_get (&sinktemplate));
|
||||||
gst_element_class_add_pad_template (gstelement_class,
|
gst_element_class_add_pad_template (gstelement_class,
|
||||||
|
@ -381,14 +379,13 @@ gst_multi_queue_finalize (GObject * object)
|
||||||
G_OBJECT_CLASS (parent_class)->finalize (object);
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \
|
#define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \
|
||||||
GList * tmp = mq->queues; \
|
GList * tmp = mq->queues; \
|
||||||
while (tmp) { \
|
while (tmp) { \
|
||||||
GstSingleQueue *q = (GstSingleQueue*)tmp->data; \
|
GstSingleQueue *q = (GstSingleQueue*)tmp->data; \
|
||||||
q->max_size.format = mq->max_size.format; \
|
q->max_size.format = mq->max_size.format; \
|
||||||
q->extra_size.format = mq->extra_size.format; \
|
tmp = g_list_next(tmp); \
|
||||||
tmp = g_list_next(tmp); \
|
}; \
|
||||||
}; \
|
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -397,37 +394,38 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
|
||||||
{
|
{
|
||||||
GstMultiQueue *mq = GST_MULTI_QUEUE (object);
|
GstMultiQueue *mq = GST_MULTI_QUEUE (object);
|
||||||
|
|
||||||
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
|
||||||
switch (prop_id) {
|
switch (prop_id) {
|
||||||
case ARG_MAX_SIZE_BYTES:
|
case ARG_MAX_SIZE_BYTES:
|
||||||
|
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
||||||
mq->max_size.bytes = g_value_get_uint (value);
|
mq->max_size.bytes = g_value_get_uint (value);
|
||||||
SET_CHILD_PROPERTY (mq, bytes);
|
SET_CHILD_PROPERTY (mq, bytes);
|
||||||
|
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||||
break;
|
break;
|
||||||
case ARG_MAX_SIZE_BUFFERS:
|
case ARG_MAX_SIZE_BUFFERS:
|
||||||
|
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
||||||
mq->max_size.visible = g_value_get_uint (value);
|
mq->max_size.visible = g_value_get_uint (value);
|
||||||
SET_CHILD_PROPERTY (mq, visible);
|
SET_CHILD_PROPERTY (mq, visible);
|
||||||
|
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||||
break;
|
break;
|
||||||
case ARG_MAX_SIZE_TIME:
|
case ARG_MAX_SIZE_TIME:
|
||||||
|
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
||||||
mq->max_size.time = g_value_get_uint64 (value);
|
mq->max_size.time = g_value_get_uint64 (value);
|
||||||
SET_CHILD_PROPERTY (mq, time);
|
SET_CHILD_PROPERTY (mq, time);
|
||||||
|
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||||
break;
|
break;
|
||||||
case ARG_EXTRA_SIZE_BYTES:
|
case ARG_EXTRA_SIZE_BYTES:
|
||||||
mq->extra_size.bytes = g_value_get_uint (value);
|
mq->extra_size.bytes = g_value_get_uint (value);
|
||||||
SET_CHILD_PROPERTY (mq, bytes);
|
|
||||||
break;
|
break;
|
||||||
case ARG_EXTRA_SIZE_BUFFERS:
|
case ARG_EXTRA_SIZE_BUFFERS:
|
||||||
mq->extra_size.visible = g_value_get_uint (value);
|
mq->extra_size.visible = g_value_get_uint (value);
|
||||||
SET_CHILD_PROPERTY (mq, visible);
|
|
||||||
break;
|
break;
|
||||||
case ARG_EXTRA_SIZE_TIME:
|
case ARG_EXTRA_SIZE_TIME:
|
||||||
mq->extra_size.time = g_value_get_uint64 (value);
|
mq->extra_size.time = g_value_get_uint64 (value);
|
||||||
SET_CHILD_PROPERTY (mq, time);
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -503,6 +501,7 @@ no_parent:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GstElement methods
|
* GstElement methods
|
||||||
*/
|
*/
|
||||||
|
@ -601,7 +600,6 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
|
||||||
sq->srcresult = GST_FLOW_OK;
|
sq->srcresult = GST_FLOW_OK;
|
||||||
sq->cur_time = 0;
|
sq->cur_time = 0;
|
||||||
sq->max_size.visible = mq->max_size.visible;
|
sq->max_size.visible = mq->max_size.visible;
|
||||||
sq->extra_size.visible = mq->extra_size.visible;
|
|
||||||
sq->is_eos = FALSE;
|
sq->is_eos = FALSE;
|
||||||
sq->inextra = FALSE;
|
sq->inextra = FALSE;
|
||||||
sq->nextid = 0;
|
sq->nextid = 0;
|
||||||
|
@ -1272,40 +1270,6 @@ compute_high_id (GstMultiQueue * mq)
|
||||||
#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
|
#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
|
||||||
(sq->max_size.format) <= (value))
|
(sq->max_size.format) <= (value))
|
||||||
|
|
||||||
static void
|
|
||||||
single_queue_overrun_cb_unlocked (GstDataQueue * dq, GstSingleQueue * sq)
|
|
||||||
{
|
|
||||||
GstMultiQueue *mq = sq->mqueue;
|
|
||||||
GList *tmp;
|
|
||||||
GstDataQueueSize size;
|
|
||||||
|
|
||||||
gst_data_queue_get_level (sq->queue, &size);
|
|
||||||
mq->filled = FALSE;
|
|
||||||
|
|
||||||
/* if we have reached max visible we can maybe bump this
|
|
||||||
* if another queue is empty, skip this if we can't grow anymore
|
|
||||||
*/
|
|
||||||
if (IS_FILLED (visible, size.visible) && sq->extra_size.visible > 0) {
|
|
||||||
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
|
|
||||||
GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
|
|
||||||
|
|
||||||
if (ssq == sq)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
if (gst_data_queue_is_empty (ssq->queue)) {
|
|
||||||
sq->max_size.visible++;
|
|
||||||
sq->extra_size.visible--;
|
|
||||||
GST_DEBUG_OBJECT (mq,
|
|
||||||
"queue %d is empty, bumping single queue %d max visible to %d",
|
|
||||||
ssq->id, sq->id, sq->max_size.visible);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/* check if the queue is still full */
|
|
||||||
mq->filled = gst_data_queue_is_full (sq->queue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GstSingleQueue functions
|
* GstSingleQueue functions
|
||||||
*/
|
*/
|
||||||
|
@ -1313,26 +1277,63 @@ static void
|
||||||
single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
|
single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
|
||||||
{
|
{
|
||||||
GstMultiQueue *mq = sq->mqueue;
|
GstMultiQueue *mq = sq->mqueue;
|
||||||
|
GList *tmp;
|
||||||
|
GstDataQueueSize size;
|
||||||
|
gboolean filled = FALSE;
|
||||||
|
|
||||||
|
gst_data_queue_get_level (sq->queue, &size);
|
||||||
|
|
||||||
GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
|
GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
|
||||||
|
|
||||||
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
||||||
|
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
|
||||||
|
GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
|
||||||
|
GstDataQueueSize ssize;
|
||||||
|
|
||||||
single_queue_overrun_cb_unlocked (dq, sq);
|
GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);
|
||||||
|
|
||||||
|
if (gst_data_queue_is_empty (ssq->queue)) {
|
||||||
|
GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
|
||||||
|
if (IS_FILLED (visible, size.visible)) {
|
||||||
|
sq->max_size.visible = size.visible + 1;
|
||||||
|
GST_DEBUG_OBJECT (mq,
|
||||||
|
"Another queue is empty, bumping single queue %d max visible to %d",
|
||||||
|
sq->id, sq->max_size.visible);
|
||||||
|
}
|
||||||
|
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||||
|
goto beach;
|
||||||
|
}
|
||||||
|
/* check if we reached the hard time/bytes limits */
|
||||||
|
gst_data_queue_get_level (ssq->queue, &ssize);
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (mq,
|
||||||
|
"queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
|
||||||
|
G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
|
||||||
|
ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
|
||||||
|
|
||||||
|
/* if this queue is filled completely we must signal overrun */
|
||||||
|
if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
|
||||||
|
GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
|
||||||
|
filled = TRUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* no queues were empty */
|
||||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||||
|
|
||||||
if (mq->filled) {
|
/* Overrun is always forwarded, since this is blocking the upstream element */
|
||||||
|
if (filled) {
|
||||||
GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
|
GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
|
||||||
g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
|
g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
|
||||||
mq->filled = FALSE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
beach:
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
|
single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
|
||||||
{
|
{
|
||||||
gboolean all_empty = TRUE;
|
gboolean empty = TRUE;
|
||||||
GstMultiQueue *mq = sq->mqueue;
|
GstMultiQueue *mq = sq->mqueue;
|
||||||
GList *tmp;
|
GList *tmp;
|
||||||
|
|
||||||
|
@ -1341,36 +1342,29 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
|
||||||
|
|
||||||
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
|
||||||
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
|
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
|
||||||
GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
|
GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
|
||||||
|
|
||||||
if (sq == ssq)
|
if (gst_data_queue_is_full (sq->queue)) {
|
||||||
continue;
|
GstDataQueueSize size;
|
||||||
|
|
||||||
/* prevent data starvation */
|
gst_data_queue_get_level (sq->queue, &size);
|
||||||
if (gst_data_queue_is_full (ssq->queue)) {
|
if (IS_FILLED (visible, size.visible)) {
|
||||||
single_queue_overrun_cb_unlocked (dq, ssq);
|
sq->max_size.visible = size.visible + 1;
|
||||||
goto check_filled;
|
GST_DEBUG_OBJECT (mq,
|
||||||
}
|
"queue %d is filled, bumping its max visible to %d", sq->id,
|
||||||
|
sq->max_size.visible);
|
||||||
if (!gst_data_queue_is_empty (ssq->queue)) {
|
gst_data_queue_limits_changed (sq->queue);
|
||||||
all_empty = FALSE;
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
if (!gst_data_queue_is_empty (sq->queue))
|
||||||
|
empty = FALSE;
|
||||||
}
|
}
|
||||||
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
|
||||||
|
|
||||||
if (all_empty) {
|
if (empty) {
|
||||||
GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
|
GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
|
||||||
g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
|
g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
|
||||||
}
|
}
|
||||||
return;
|
|
||||||
|
|
||||||
check_filled:
|
|
||||||
if (mq->filled) {
|
|
||||||
GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
|
|
||||||
g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
|
|
||||||
mq->filled = FALSE;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
|
@ -1448,7 +1442,7 @@ gst_single_queue_new (GstMultiQueue * mqueue)
|
||||||
sq->oldid = 0;
|
sq->oldid = 0;
|
||||||
sq->turn = g_cond_new ();
|
sq->turn = g_cond_new ();
|
||||||
|
|
||||||
/* attach to underrun/overrun signals to handle non-starvation */
|
/* attach to underrun/overrun signals to handle non-starvation */
|
||||||
g_signal_connect (G_OBJECT (sq->queue), "full",
|
g_signal_connect (G_OBJECT (sq->queue), "full",
|
||||||
G_CALLBACK (single_queue_overrun_cb), sq);
|
G_CALLBACK (single_queue_overrun_cb), sq);
|
||||||
g_signal_connect (G_OBJECT (sq->queue), "empty",
|
g_signal_connect (G_OBJECT (sq->queue), "empty",
|
||||||
|
|
|
@ -68,7 +68,6 @@ struct _GstMultiQueue {
|
||||||
gint nextnotlinked; /* ID of the next queue not linked (-1 : none) */
|
gint nextnotlinked; /* ID of the next queue not linked (-1 : none) */
|
||||||
|
|
||||||
gint numwaiting; /* number of not-linked pads waiting */
|
gint numwaiting; /* number of not-linked pads waiting */
|
||||||
gboolean filled; /* overrun detected */
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct _GstMultiQueueClass {
|
struct _GstMultiQueueClass {
|
||||||
|
|
Loading…
Reference in a new issue