queue2: improve buffer level measurement in download mode

Keep track of the current buffer level in the current range in download mode so
that we post the correct buffering messages.
This commit is contained in:
Wim Taymans 2010-03-25 17:21:02 +01:00
parent 1a72c2f01b
commit 1f34682644

View file

@ -882,6 +882,20 @@ update_out_rates (GstQueue2 * queue)
queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
}
static void
update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
{
guint64 max_reading_pos, writing_pos;
writing_pos = range->writing_pos;
max_reading_pos = range->max_reading_pos;
if (writing_pos > max_reading_pos)
queue->cur_level.bytes = writing_pos - max_reading_pos;
else
queue->cur_level.bytes = 0;
}
static void
gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
{
@ -912,6 +926,10 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
}
writing_pos += size;
GST_INFO_OBJECT (queue,
"writing %" G_GUINT64_FORMAT ", max_reading %" G_GUINT64_FORMAT,
writing_pos, max_reading_pos);
if (writing_pos > max_reading_pos)
queue->cur_level.bytes = writing_pos - max_reading_pos;
else
@ -919,9 +937,9 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
/* try to merge with next range */
while ((next = queue->current->next)) {
GST_DEBUG_OBJECT (queue,
"cheking %" G_GUINT64_FORMAT " < %" G_GUINT64_FORMAT, writing_pos,
next->offset);
GST_INFO_OBJECT (queue,
"checking merge with next range %" G_GUINT64_FORMAT " < %"
G_GUINT64_FORMAT, writing_pos, next->offset);
if (writing_pos < next->offset)
break;
@ -939,6 +957,21 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
queue->current->writing_pos = writing_pos;
}
static void
update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
{
guint64 reading_pos, max_reading_pos;
reading_pos = pos;
max_reading_pos = range->max_reading_pos;
max_reading_pos = MAX (max_reading_pos, reading_pos);
range->max_reading_pos = max_reading_pos;
update_cur_level (queue, range);
}
static gboolean
perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
{
@ -958,6 +991,8 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
if (res) {
queue->current = add_range (queue, offset);
/* update the stats for this range */
update_cur_level (queue, queue->current);
}
return res;
}
@ -968,14 +1003,18 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
{
GstQueue2Range *range;
GST_DEBUG_OBJECT (queue, "offset %" G_GUINT64_FORMAT ", len %u", offset,
length);
GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
offset, length);
if ((range = find_range (queue, offset, length))) {
if (queue->current != range) {
GST_DEBUG_OBJECT (queue, "switching ranges");
GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
perform_seek_to_offset (queue, range->writing_pos);
}
/* update the current reading position in the range */
update_cur_pos (queue, queue->current, offset + length);
/* we have a range for offset */
GST_DEBUG_OBJECT (queue,
"we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
@ -988,10 +1027,16 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
return TRUE;
} else {
GST_INFO_OBJECT (queue, "not found in any range");
/* we don't have the range, see how far away we are, FIXME, find a good
* threshold based on the incomming rate. */
if (queue->current && offset < queue->current->writing_pos + 200000)
return FALSE;
if (queue->current) {
if (offset < queue->current->writing_pos + 200000) {
update_cur_pos (queue, queue->current, offset + length);
GST_INFO_OBJECT (queue, "wait for data");
return FALSE;
}
}
/* too far away, do a seek */
perform_seek_to_offset (queue, offset);
@ -1006,7 +1051,6 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
{
size_t res;
GstBuffer *buf;
guint64 reading_pos, max_reading_pos, writing_pos;
/* check if we have enough data at @offset. If there is not enough data, we
* block and wait. */
@ -1049,21 +1093,6 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
*buffer = buf;
reading_pos = queue->current->reading_pos;
writing_pos = queue->current->writing_pos;
max_reading_pos = queue->current->max_reading_pos;
reading_pos = offset + length;
max_reading_pos = MAX (max_reading_pos, reading_pos);
if (writing_pos > max_reading_pos)
queue->cur_level.bytes = writing_pos - max_reading_pos;
else
queue->cur_level.bytes = 0;
queue->current->reading_pos = reading_pos;
queue->current->max_reading_pos = max_reading_pos;
return GST_FLOW_OK;
/* ERRORS */
@ -1277,8 +1306,10 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
size = GST_BUFFER_SIZE (buffer);
/* add buffer to the statistics */
queue->cur_level.buffers++;
queue->cur_level.bytes += size;
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
queue->cur_level.buffers++;
queue->cur_level.bytes += size;
}
queue->bytes_in += size;
/* apply new buffer to segment stats */
@ -1382,9 +1413,12 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved buffer %p from queue", buffer);
queue->cur_level.buffers--;
queue->cur_level.bytes -= size;
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
queue->cur_level.buffers--;
queue->cur_level.bytes -= size;
}
queue->bytes_out += size;
apply_buffer (queue, buffer, &queue->src_segment);
/* update the byterate stats */
update_out_rates (queue);
@ -1854,6 +1888,8 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
/* now unblock the getrange function */
GST_QUEUE2_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_OK;
if (queue->current)
queue->current->max_reading_pos = 0;
GST_QUEUE2_MUTEX_UNLOCK (queue);
/* when using a temp file, we eat the event */