queue2: More ring buffer fixes

- Set reading_pos correctly in _create_read ()
- Seek to data if it is further than QUEUE_MAX_BYTES (queue) -
  cur_level.bytes away. This should avoid a situation where the ring
  buffer is full but the data offset from which we shall read is not in
  the ring buffer.
- Only update the max_reading_pos to a lower value to protect data when
  necessary
- Always signal an ADD in _locked_enqueue () so that an EOS unlocks the
  reader
- More useful debug output
This commit is contained in:
Robert Swain 2010-07-02 17:40:08 +02:00 committed by Wim Taymans
parent c78996a313
commit 0182c0d88b

View file

@ -525,6 +525,13 @@ find_range (GstQueue2 * queue, guint64 offset, guint64 length)
break;
}
}
if (range) {
GST_DEBUG_OBJECT (queue,
"found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
} else {
GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
}
return range;
}
@ -965,8 +972,8 @@ update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
max_reading_pos = MAX (max_reading_pos, reading_pos);
GST_DEBUG_OBJECT (queue,
"updating max_reading_pos to %" G_GUINT64_FORMAT " from %"
G_GUINT64_FORMAT, max_reading_pos, range->max_reading_pos);
"updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
range->max_reading_pos = max_reading_pos;
update_cur_level (queue, range);
@ -1035,9 +1042,12 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
if (!queue->is_eos && queue->current) {
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
if (offset < queue->current->offset || offset >
queue->current->writing_pos + queue->max_level.bytes -
queue->current->writing_pos + QUEUE_MAX_BYTES (queue) -
queue->cur_level.bytes) {
perform_seek_to_offset (queue, offset);
} else {
GST_INFO_OBJECT (queue,
"requested data is within range, wait for data");
}
} else if (offset < queue->current->writing_pos + 200000) {
update_cur_pos (queue, queue->current, offset + length);
@ -1142,8 +1152,6 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
if (!gst_queue2_have_data (queue, rpos, remaining)) {
read_length = 0;
queue->current->reading_pos = rpos;
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
guint64 level;
@ -1156,7 +1164,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
GST_DEBUG_OBJECT (queue,
"reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
", level %" G_GUINT64_FORMAT,
queue->current->reading_pos, queue->current->writing_pos, level);
rpos, queue->current->writing_pos, level);
if (level >= rb_size) {
/* we don't have the data but if we have a ring buffer that is full, we
@ -1165,11 +1173,23 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
"ring buffer full, reading ring-buffer-max-size %d bytes",
rb_size);
read_length = rb_size;
} else if (queue->is_eos) {
/* won't get any more data so read any data we have */
if (level) {
GST_DEBUG_OBJECT (queue, "EOS hit but read %u bytes that we have",
level);
read_length = level;
} else {
GST_DEBUG_OBJECT (queue,
"EOS hit and we don't have any requested data");
return GST_FLOW_UNEXPECTED;
}
}
}
if (read_length == 0) {
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
if (QUEUE_IS_USING_RING_BUFFER (queue)
&& queue->current->max_reading_pos > rpos) {
/* protect cached data (data between offset and max_reading_pos)
* and update current level */
GST_DEBUG_OBJECT (queue,
@ -1186,6 +1206,9 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
read_length = remaining;
}
/* set range reading_pos to actual reading position for this read */
queue->current->reading_pos = rpos;
/* congfigure how much and from where to read */
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
file_offset =
@ -1475,6 +1498,9 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
size = GST_BUFFER_SIZE (buffer);
data = GST_BUFFER_DATA (buffer);
GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
GST_BUFFER_OFFSET (buffer));
while (size > 0) {
guint to_write;
@ -1588,7 +1614,10 @@ gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
goto seek_failed;
if (new_writing_pos > writing_pos) {
GST_INFO_OBJECT (queue, "writing %u bytes", to_write);
GST_INFO_OBJECT (queue,
"writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
"] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
queue->current->writing_pos, queue->current->rb_writing_pos);
/* either not using ring buffer or no wrapping, just write */
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (fwrite (data, to_write, 1, queue->temp_file) != 1)
@ -1788,8 +1817,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
}
if (!QUEUE_IS_USING_RING_BUFFER (queue))
GST_QUEUE2_SIGNAL_ADD (queue);
GST_QUEUE2_SIGNAL_ADD (queue);
}
return;