From 1f34682644019d9af70e1dcdeb6e61d86c670dc9 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 25 Mar 2010 17:21:02 +0100 Subject: [PATCH] queue2: improve buffer level measurement in download mode Keep track of the current buffer level in the current range in download mode so that we post the correct buffering messages. --- plugins/elements/gstqueue2.c | 92 +++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 28 deletions(-) diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index f64a0f89eb..67ee674eba 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -882,6 +882,20 @@ update_out_rates (GstQueue2 * queue) queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time)); } +static void +update_cur_level (GstQueue2 * queue, GstQueue2Range * range) +{ + guint64 max_reading_pos, writing_pos; + + writing_pos = range->writing_pos; + max_reading_pos = range->max_reading_pos; + + if (writing_pos > max_reading_pos) + queue->cur_level.bytes = writing_pos - max_reading_pos; + else + queue->cur_level.bytes = 0; +} + static void gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) { @@ -912,6 +926,10 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) } writing_pos += size; + GST_INFO_OBJECT (queue, + "writing %" G_GUINT64_FORMAT ", max_reading %" G_GUINT64_FORMAT, + writing_pos, max_reading_pos); + if (writing_pos > max_reading_pos) queue->cur_level.bytes = writing_pos - max_reading_pos; else @@ -919,9 +937,9 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) /* try to merge with next range */ while ((next = queue->current->next)) { - GST_DEBUG_OBJECT (queue, - "cheking %" G_GUINT64_FORMAT " < %" G_GUINT64_FORMAT, writing_pos, - next->offset); + GST_INFO_OBJECT (queue, + "checking merge with next range %" G_GUINT64_FORMAT " < %" + G_GUINT64_FORMAT, writing_pos, next->offset); if (writing_pos < next->offset) break; @@ -939,6 +957,21 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) queue->current->writing_pos = writing_pos; } +static void +update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos) +{ + guint64 reading_pos, max_reading_pos; + + reading_pos = pos; + max_reading_pos = range->max_reading_pos; + + max_reading_pos = MAX (max_reading_pos, reading_pos); + + range->max_reading_pos = max_reading_pos; + + update_cur_level (queue, range); +} + static gboolean perform_seek_to_offset (GstQueue2 * queue, guint64 offset) { @@ -958,6 +991,8 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset) if (res) { queue->current = add_range (queue, offset); + /* update the stats for this range */ + update_cur_level (queue, queue->current); } return res; } @@ -968,14 +1003,18 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) { GstQueue2Range *range; - GST_DEBUG_OBJECT (queue, "offset %" G_GUINT64_FORMAT ", len %u", offset, - length); + GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u", + offset, length); if ((range = find_range (queue, offset, length))) { if (queue->current != range) { - GST_DEBUG_OBJECT (queue, "switching ranges"); + GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position"); perform_seek_to_offset (queue, range->writing_pos); } + + /* update the current reading position in the range */ + update_cur_pos (queue, queue->current, offset + length); + /* we have a range for offset */ GST_DEBUG_OBJECT (queue, "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %" @@ -988,10 +1027,16 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) return TRUE; } else { + GST_INFO_OBJECT (queue, "not found in any range"); /* we don't have the range, see how far away we are, FIXME, find a good * threshold based on the incomming rate. */ - if (queue->current && offset < queue->current->writing_pos + 200000) - return FALSE; + if (queue->current) { + if (offset < queue->current->writing_pos + 200000) { + update_cur_pos (queue, queue->current, offset + length); + GST_INFO_OBJECT (queue, "wait for data"); + return FALSE; + } + } /* too far away, do a seek */ perform_seek_to_offset (queue, offset); @@ -1006,7 +1051,6 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, { size_t res; GstBuffer *buf; - guint64 reading_pos, max_reading_pos, writing_pos; /* check if we have enough data at @offset. If there is not enough data, we * block and wait. */ @@ -1049,21 +1093,6 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, *buffer = buf; - reading_pos = queue->current->reading_pos; - writing_pos = queue->current->writing_pos; - max_reading_pos = queue->current->max_reading_pos; - - reading_pos = offset + length; - max_reading_pos = MAX (max_reading_pos, reading_pos); - - if (writing_pos > max_reading_pos) - queue->cur_level.bytes = writing_pos - max_reading_pos; - else - queue->cur_level.bytes = 0; - - queue->current->reading_pos = reading_pos; - queue->current->max_reading_pos = max_reading_pos; - return GST_FLOW_OK; /* ERRORS */ @@ -1277,8 +1306,10 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item) size = GST_BUFFER_SIZE (buffer); /* add buffer to the statistics */ - queue->cur_level.buffers++; - queue->cur_level.bytes += size; + if (!QUEUE_IS_USING_TEMP_FILE (queue)) { + queue->cur_level.buffers++; + queue->cur_level.bytes += size; + } queue->bytes_in += size; /* apply new buffer to segment stats */ @@ -1382,9 +1413,12 @@ gst_queue2_locked_dequeue (GstQueue2 * queue) GST_CAT_LOG_OBJECT (queue_dataflow, queue, "retrieved buffer %p from queue", buffer); - queue->cur_level.buffers--; - queue->cur_level.bytes -= size; + if (!QUEUE_IS_USING_TEMP_FILE (queue)) { + queue->cur_level.buffers--; + queue->cur_level.bytes -= size; + } queue->bytes_out += size; + apply_buffer (queue, buffer, &queue->src_segment); /* update the byterate stats */ update_out_rates (queue); @@ -1854,6 +1888,8 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event) /* now unblock the getrange function */ GST_QUEUE2_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_OK; + if (queue->current) + queue->current->max_reading_pos = 0; GST_QUEUE2_MUTEX_UNLOCK (queue); /* when using a temp file, we eat the event */