Original commit message from CVS:
Add patch for better buffering (especially useful for network streaming), see #108268 - kudos go to janzen@pixelmetrix.com
This commit is contained in:
Ronald S. Bultje 2003-05-30 23:20:02 +00:00
parent a7b3634ddd
commit ed086d5f79
4 changed files with 66 additions and 34 deletions

View file

@ -67,6 +67,7 @@ enum {
ARG_LEAKY,
ARG_LEVEL,
ARG_MAX_LEVEL,
ARG_MIN_THRESHOLD_BYTES,
ARG_MAY_DEADLOCK,
ARG_BLOCK_TIMEOUT,
};
@ -160,6 +161,10 @@ gst_queue_class_init (GstQueueClass *klass)
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
0, G_MAXINT, 100, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MIN_THRESHOLD_BYTES,
g_param_spec_int ("min_threshold_bytes", "Minimum Threshold",
"Minimum bytes required before signalling not_empty to reader.",
0, G_MAXINT, 0, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
TRUE, G_PARAM_READWRITE));
@ -235,8 +240,9 @@ gst_queue_init (GstQueue *queue)
queue->level_bytes = 0;
queue->level_time = G_GINT64_CONSTANT (0);
queue->size_buffers = 100; /* 100 buffers */
queue->size_bytes = 100 * 1024; /* 100KB */
queue->size_time = G_GINT64_CONSTANT (1000000000); /* 1sec */
queue->size_bytes = 100 * 1024; /* 100KB */
queue->size_time = GST_SECOND; /* 1sec */
queue->min_threshold_bytes = 0;
queue->may_deadlock = TRUE;
queue->block_timeout = -1;
queue->interrupt = FALSE;
@ -366,7 +372,6 @@ restart:
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(buf)));
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end");
/* now we have to clean up and exit right away */
g_mutex_unlock (queue->qlock);
goto out_unref;
@ -392,8 +397,8 @@ restart:
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d",
queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
while (queue->level_buffers == queue->size_buffers) {
/* if there's a pending state change for this queue or its manager, switch */
@ -403,7 +408,7 @@ restart:
g_mutex_unlock (queue->qlock);
if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), GST_ELEMENT (queue)))
goto out_unref;
/* if we got here bacause we were unlocked after a flush, we don't need
/* if we got here because we were unlocked after a flush, we don't need
* to add the buffer to the queue again */
if (queue->flush) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "not adding pending buffer after flush");
@ -427,13 +432,13 @@ restart:
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d",
queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
g_cond_wait (queue->not_full, queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal");
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d",
queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
}
/* put the buffer on the tail of the list */
@ -445,9 +450,9 @@ restart:
/* this assertion _has_ to hold */
g_assert (queue->queue->length == queue->level_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d",
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d buffers, %d bytes",
GST_DEBUG_PAD_NAME(pad),
queue->level_buffers, queue->size_buffers);
queue->level_buffers, queue->size_buffers, queue->level_bytes);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty");
g_cond_signal (queue->not_empty);
@ -480,7 +485,8 @@ restart:
g_mutex_lock (queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
while (queue->level_buffers == 0) {
/* if there's a pending state change for this queue or its manager, switch
* back to iterator so bottom half of state change executes
@ -505,7 +511,8 @@ restart:
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d", queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
/* if (queue->block_timeout > -1){ */
if (FALSE) {
@ -523,7 +530,8 @@ restart:
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal");
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
front = g_queue_pop_head (queue->queue);
buf = (GstBuffer *)(front);
@ -532,9 +540,9 @@ restart:
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d",
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes",
GST_DEBUG_PAD_NAME(pad),
queue->level_buffers, queue->size_buffers);
queue->level_buffers, queue->size_buffers, queue->level_bytes);
/* this assertion _has_ to hold */
g_assert (queue->queue->length == queue->level_buffers);
@ -710,6 +718,9 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa
case ARG_MAX_LEVEL:
queue->size_buffers = g_value_get_int (value);
break;
case ARG_MIN_THRESHOLD_BYTES:
queue->min_threshold_bytes = g_value_get_int (value);
break;
case ARG_MAY_DEADLOCK:
queue->may_deadlock = g_value_get_boolean (value);
break;
@ -742,6 +753,9 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe
case ARG_MAX_LEVEL:
g_value_set_int (value, queue->size_buffers);
break;
case ARG_MIN_THRESHOLD_BYTES:
g_value_set_int (value, queue->min_threshold_bytes);
break;
case ARG_MAY_DEADLOCK:
g_value_set_boolean (value, queue->may_deadlock);
break;

View file

@ -73,6 +73,8 @@ struct _GstQueue {
gint leaky; /* whether the queue is leaky, and if so at which end */
gint block_timeout; /* microseconds until a blocked queue times out and returns GST_EVENT_FILLER.
* A value of -1 will block forever. */
guint min_threshold_bytes; /* the minimum number of bytes required before
* waking up the reader thread */
gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
gboolean interrupt;
gboolean flush;

View file

@ -67,6 +67,7 @@ enum {
ARG_LEAKY,
ARG_LEVEL,
ARG_MAX_LEVEL,
ARG_MIN_THRESHOLD_BYTES,
ARG_MAY_DEADLOCK,
ARG_BLOCK_TIMEOUT,
};
@ -160,6 +161,10 @@ gst_queue_class_init (GstQueueClass *klass)
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
0, G_MAXINT, 100, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MIN_THRESHOLD_BYTES,
g_param_spec_int ("min_threshold_bytes", "Minimum Threshold",
"Minimum bytes required before signalling not_empty to reader.",
0, G_MAXINT, 0, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
TRUE, G_PARAM_READWRITE));
@ -235,8 +240,9 @@ gst_queue_init (GstQueue *queue)
queue->level_bytes = 0;
queue->level_time = G_GINT64_CONSTANT (0);
queue->size_buffers = 100; /* 100 buffers */
queue->size_bytes = 100 * 1024; /* 100KB */
queue->size_time = G_GINT64_CONSTANT (1000000000); /* 1sec */
queue->size_bytes = 100 * 1024; /* 100KB */
queue->size_time = GST_SECOND; /* 1sec */
queue->min_threshold_bytes = 0;
queue->may_deadlock = TRUE;
queue->block_timeout = -1;
queue->interrupt = FALSE;
@ -366,7 +372,6 @@ restart:
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(buf)));
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end");
/* now we have to clean up and exit right away */
g_mutex_unlock (queue->qlock);
goto out_unref;
@ -392,8 +397,8 @@ restart:
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d",
queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
while (queue->level_buffers == queue->size_buffers) {
/* if there's a pending state change for this queue or its manager, switch */
@ -403,7 +408,7 @@ restart:
g_mutex_unlock (queue->qlock);
if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), GST_ELEMENT (queue)))
goto out_unref;
/* if we got here bacause we were unlocked after a flush, we don't need
/* if we got here because we were unlocked after a flush, we don't need
* to add the buffer to the queue again */
if (queue->flush) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "not adding pending buffer after flush");
@ -427,13 +432,13 @@ restart:
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d",
queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
g_cond_wait (queue->not_full, queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal");
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d",
queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
}
/* put the buffer on the tail of the list */
@ -445,9 +450,9 @@ restart:
/* this assertion _has_ to hold */
g_assert (queue->queue->length == queue->level_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d",
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d buffers, %d bytes",
GST_DEBUG_PAD_NAME(pad),
queue->level_buffers, queue->size_buffers);
queue->level_buffers, queue->size_buffers, queue->level_bytes);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty");
g_cond_signal (queue->not_empty);
@ -480,7 +485,8 @@ restart:
g_mutex_lock (queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
while (queue->level_buffers == 0) {
/* if there's a pending state change for this queue or its manager, switch
* back to iterator so bottom half of state change executes
@ -505,7 +511,8 @@ restart:
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d", queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
/* if (queue->block_timeout > -1){ */
if (FALSE) {
@ -523,7 +530,8 @@ restart:
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal");
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes);
front = g_queue_pop_head (queue->queue);
buf = (GstBuffer *)(front);
@ -532,9 +540,9 @@ restart:
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d",
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes",
GST_DEBUG_PAD_NAME(pad),
queue->level_buffers, queue->size_buffers);
queue->level_buffers, queue->size_buffers, queue->level_bytes);
/* this assertion _has_ to hold */
g_assert (queue->queue->length == queue->level_buffers);
@ -710,6 +718,9 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa
case ARG_MAX_LEVEL:
queue->size_buffers = g_value_get_int (value);
break;
case ARG_MIN_THRESHOLD_BYTES:
queue->min_threshold_bytes = g_value_get_int (value);
break;
case ARG_MAY_DEADLOCK:
queue->may_deadlock = g_value_get_boolean (value);
break;
@ -742,6 +753,9 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe
case ARG_MAX_LEVEL:
g_value_set_int (value, queue->size_buffers);
break;
case ARG_MIN_THRESHOLD_BYTES:
g_value_set_int (value, queue->min_threshold_bytes);
break;
case ARG_MAY_DEADLOCK:
g_value_set_boolean (value, queue->may_deadlock);
break;

View file

@ -73,6 +73,8 @@ struct _GstQueue {
gint leaky; /* whether the queue is leaky, and if so at which end */
gint block_timeout; /* microseconds until a blocked queue times out and returns GST_EVENT_FILLER.
* A value of -1 will block forever. */
guint min_threshold_bytes; /* the minimum number of bytes required before
* waking up the reader thread */
gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
gboolean interrupt;
gboolean flush;