diff --git a/gst/gstqueue.c b/gst/gstqueue.c index b41ef3fcbe..c01deb3cd4 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -67,6 +67,7 @@ enum { ARG_LEAKY, ARG_LEVEL, ARG_MAX_LEVEL, + ARG_MIN_THRESHOLD_BYTES, ARG_MAY_DEADLOCK, ARG_BLOCK_TIMEOUT, }; @@ -160,6 +161,10 @@ gst_queue_class_init (GstQueueClass *klass) g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL, g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.", 0, G_MAXINT, 100, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MIN_THRESHOLD_BYTES, + g_param_spec_int ("min_threshold_bytes", "Minimum Threshold", + "Minimum bytes required before signalling not_empty to reader.", + 0, G_MAXINT, 0, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK, g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING", TRUE, G_PARAM_READWRITE)); @@ -235,8 +240,9 @@ gst_queue_init (GstQueue *queue) queue->level_bytes = 0; queue->level_time = G_GINT64_CONSTANT (0); queue->size_buffers = 100; /* 100 buffers */ - queue->size_bytes = 100 * 1024; /* 100KB */ - queue->size_time = G_GINT64_CONSTANT (1000000000); /* 1sec */ + queue->size_bytes = 100 * 1024; /* 100KB */ + queue->size_time = GST_SECOND; /* 1sec */ + queue->min_threshold_bytes = 0; queue->may_deadlock = TRUE; queue->block_timeout = -1; queue->interrupt = FALSE; @@ -366,7 +372,6 @@ restart: fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", GST_ELEMENT_NAME(GST_ELEMENT(queue)), GST_EVENT_TYPE(GST_EVENT(buf))); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end"); /* now we have to clean up and exit right away */ g_mutex_unlock (queue->qlock); goto out_unref; @@ -392,8 +397,8 @@ restart: } } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d", - queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); while (queue->level_buffers == queue->size_buffers) { /* if there's a pending state change for this queue or its manager, switch */ @@ -403,7 +408,7 @@ restart: g_mutex_unlock (queue->qlock); if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), GST_ELEMENT (queue))) goto out_unref; - /* if we got here bacause we were unlocked after a flush, we don't need + /* if we got here because we were unlocked after a flush, we don't need * to add the buffer to the queue again */ if (queue->flush) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "not adding pending buffer after flush"); @@ -427,13 +432,13 @@ restart: } } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d", - queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); g_cond_wait (queue->not_full, queue->qlock); GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal"); } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d", - queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); } /* put the buffer on the tail of the list */ @@ -445,9 +450,9 @@ restart: /* this assertion _has_ to hold */ g_assert (queue->queue->length == queue->level_buffers); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d", + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d buffers, %d bytes", GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers); + queue->level_buffers, queue->size_buffers, queue->level_bytes); GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty"); g_cond_signal (queue->not_empty); @@ -480,7 +485,8 @@ restart: g_mutex_lock (queue->qlock); GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); while (queue->level_buffers == 0) { /* if there's a pending state change for this queue or its manager, switch * back to iterator so bottom half of state change executes @@ -505,7 +511,8 @@ restart: } } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d", queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); /* if (queue->block_timeout > -1){ */ if (FALSE) { @@ -523,7 +530,8 @@ restart: } GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal"); } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); front = g_queue_pop_head (queue->queue); buf = (GstBuffer *)(front); @@ -532,9 +540,9 @@ restart: queue->level_buffers--; queue->level_bytes -= GST_BUFFER_SIZE(buf); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d", + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes", GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers); + queue->level_buffers, queue->size_buffers, queue->level_bytes); /* this assertion _has_ to hold */ g_assert (queue->queue->length == queue->level_buffers); @@ -710,6 +718,9 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa case ARG_MAX_LEVEL: queue->size_buffers = g_value_get_int (value); break; + case ARG_MIN_THRESHOLD_BYTES: + queue->min_threshold_bytes = g_value_get_int (value); + break; case ARG_MAY_DEADLOCK: queue->may_deadlock = g_value_get_boolean (value); break; @@ -742,6 +753,9 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe case ARG_MAX_LEVEL: g_value_set_int (value, queue->size_buffers); break; + case ARG_MIN_THRESHOLD_BYTES: + g_value_set_int (value, queue->min_threshold_bytes); + break; case ARG_MAY_DEADLOCK: g_value_set_boolean (value, queue->may_deadlock); break; diff --git a/gst/gstqueue.h b/gst/gstqueue.h index 8438fe4fad..2cd55f398d 100644 --- a/gst/gstqueue.h +++ b/gst/gstqueue.h @@ -73,6 +73,8 @@ struct _GstQueue { gint leaky; /* whether the queue is leaky, and if so at which end */ gint block_timeout; /* microseconds until a blocked queue times out and returns GST_EVENT_FILLER. * A value of -1 will block forever. */ + guint min_threshold_bytes; /* the minimum number of bytes required before + * waking up the reader thread */ gboolean may_deadlock; /* it the queue should fail on possible deadlocks */ gboolean interrupt; gboolean flush; diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index b41ef3fcbe..c01deb3cd4 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -67,6 +67,7 @@ enum { ARG_LEAKY, ARG_LEVEL, ARG_MAX_LEVEL, + ARG_MIN_THRESHOLD_BYTES, ARG_MAY_DEADLOCK, ARG_BLOCK_TIMEOUT, }; @@ -160,6 +161,10 @@ gst_queue_class_init (GstQueueClass *klass) g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL, g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.", 0, G_MAXINT, 100, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MIN_THRESHOLD_BYTES, + g_param_spec_int ("min_threshold_bytes", "Minimum Threshold", + "Minimum bytes required before signalling not_empty to reader.", + 0, G_MAXINT, 0, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK, g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING", TRUE, G_PARAM_READWRITE)); @@ -235,8 +240,9 @@ gst_queue_init (GstQueue *queue) queue->level_bytes = 0; queue->level_time = G_GINT64_CONSTANT (0); queue->size_buffers = 100; /* 100 buffers */ - queue->size_bytes = 100 * 1024; /* 100KB */ - queue->size_time = G_GINT64_CONSTANT (1000000000); /* 1sec */ + queue->size_bytes = 100 * 1024; /* 100KB */ + queue->size_time = GST_SECOND; /* 1sec */ + queue->min_threshold_bytes = 0; queue->may_deadlock = TRUE; queue->block_timeout = -1; queue->interrupt = FALSE; @@ -366,7 +372,6 @@ restart: fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", GST_ELEMENT_NAME(GST_ELEMENT(queue)), GST_EVENT_TYPE(GST_EVENT(buf))); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end"); /* now we have to clean up and exit right away */ g_mutex_unlock (queue->qlock); goto out_unref; @@ -392,8 +397,8 @@ restart: } } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d", - queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); while (queue->level_buffers == queue->size_buffers) { /* if there's a pending state change for this queue or its manager, switch */ @@ -403,7 +408,7 @@ restart: g_mutex_unlock (queue->qlock); if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), GST_ELEMENT (queue))) goto out_unref; - /* if we got here bacause we were unlocked after a flush, we don't need + /* if we got here because we were unlocked after a flush, we don't need * to add the buffer to the queue again */ if (queue->flush) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "not adding pending buffer after flush"); @@ -427,13 +432,13 @@ restart: } } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d", - queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); g_cond_wait (queue->not_full, queue->qlock); GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal"); } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d", - queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); } /* put the buffer on the tail of the list */ @@ -445,9 +450,9 @@ restart: /* this assertion _has_ to hold */ g_assert (queue->queue->length == queue->level_buffers); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d", + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d buffers, %d bytes", GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers); + queue->level_buffers, queue->size_buffers, queue->level_bytes); GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty"); g_cond_signal (queue->not_empty); @@ -480,7 +485,8 @@ restart: g_mutex_lock (queue->qlock); GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); while (queue->level_buffers == 0) { /* if there's a pending state change for this queue or its manager, switch * back to iterator so bottom half of state change executes @@ -505,7 +511,8 @@ restart: } } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d", queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); /* if (queue->block_timeout > -1){ */ if (FALSE) { @@ -523,7 +530,8 @@ restart: } GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal"); } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d buffers, %d bytes", + queue->level_buffers, queue->size_buffers, queue->level_bytes); front = g_queue_pop_head (queue->queue); buf = (GstBuffer *)(front); @@ -532,9 +540,9 @@ restart: queue->level_buffers--; queue->level_bytes -= GST_BUFFER_SIZE(buf); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d", + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes", GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers); + queue->level_buffers, queue->size_buffers, queue->level_bytes); /* this assertion _has_ to hold */ g_assert (queue->queue->length == queue->level_buffers); @@ -710,6 +718,9 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa case ARG_MAX_LEVEL: queue->size_buffers = g_value_get_int (value); break; + case ARG_MIN_THRESHOLD_BYTES: + queue->min_threshold_bytes = g_value_get_int (value); + break; case ARG_MAY_DEADLOCK: queue->may_deadlock = g_value_get_boolean (value); break; @@ -742,6 +753,9 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe case ARG_MAX_LEVEL: g_value_set_int (value, queue->size_buffers); break; + case ARG_MIN_THRESHOLD_BYTES: + g_value_set_int (value, queue->min_threshold_bytes); + break; case ARG_MAY_DEADLOCK: g_value_set_boolean (value, queue->may_deadlock); break; diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 8438fe4fad..2cd55f398d 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -73,6 +73,8 @@ struct _GstQueue { gint leaky; /* whether the queue is leaky, and if so at which end */ gint block_timeout; /* microseconds until a blocked queue times out and returns GST_EVENT_FILLER. * A value of -1 will block forever. */ + guint min_threshold_bytes; /* the minimum number of bytes required before + * waking up the reader thread */ gboolean may_deadlock; /* it the queue should fail on possible deadlocks */ gboolean interrupt; gboolean flush;