queue2: merge write buffer functions and fix bugs

Cached data could have been overwritten so it is now protected until
it is read. Similarly data was overread as _have_data () was always
looking for the originally requested data even if part of it had been
read already.
This commit is contained in:
Robert Swain 2010-06-18 14:36:33 +02:00 committed by Wim Taymans
parent 04f1572ea2
commit e29cca10a4

View file

@ -366,8 +366,8 @@ gst_queue2_class_init (GstQueue2Class * klass)
g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
g_param_spec_uint64 ("ring-buffer-max-size",
"Max. ring buffer size (bytes)",
"Max. amount of data in the ring buffer (bytes, 0=unlimited)",
DEFAULT_BUFFER_SIZE, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
"Max. amount of data in the ring buffer (bytes, 0 = disabled",
0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/* set several parent class virtual functions */
@ -951,89 +951,6 @@ update_out_rates (GstQueue2 * queue)
queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
}
#ifdef HAVE_FSEEKO
#define FSEEK_FILE(file,offset) (fseeko (file, (off_t) offset, SEEK_SET) != 0)
#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
#define FSEEK_FILE(file,offset) (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
#else
#define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0)
#endif
static gboolean
gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
{
guint size;
guint8 *data;
guint64 writing_pos, max_reading_pos;
GstQueue2Range *next;
writing_pos = queue->current->writing_pos;
max_reading_pos = queue->current->max_reading_pos;
if (FSEEK_FILE (queue->temp_file, writing_pos))
goto seek_failed;
data = GST_BUFFER_DATA (buffer);
size = GST_BUFFER_SIZE (buffer);
if (fwrite (data, size, 1, queue->temp_file) != 1)
goto handle_error;
writing_pos += size;
GST_INFO_OBJECT (queue,
"writing %" G_GUINT64_FORMAT ", max_reading %" G_GUINT64_FORMAT,
writing_pos, max_reading_pos);
/* try to merge with next range */
while ((next = queue->current->next)) {
GST_INFO_OBJECT (queue,
"checking merge with next range %" G_GUINT64_FORMAT " < %"
G_GUINT64_FORMAT, writing_pos, next->offset);
if (writing_pos < next->offset)
break;
GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
next->writing_pos);
/* remove the group, we could choose to not read the data in this range
* again. This would involve us doing a seek to the current writing position
* in the range. FIXME, It would probably make sense to do a seek when there
* is a lot of data in the range we merged with to avoid reading it all
* again. */
queue->current->next = next->next;
g_slice_free (GstQueue2Range, next);
debug_ranges (queue);
}
queue->current->writing_pos = writing_pos;
update_cur_level (queue, queue->current);
return TRUE;
/* ERRORS */
seek_failed:
{
GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
handle_error:
{
switch (errno) {
case ENOSPC:{
GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
break;
}
default:{
GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
(_("Error while writing to download file.")),
("%s", g_strerror (errno)));
}
}
return FALSE;
}
}
static void
update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
{
@ -1087,9 +1004,6 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
perform_seek_to_offset (queue, range->writing_pos);
}
/* update the current reading position in the range */
update_cur_pos (queue, queue->current, offset + length);
GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
@ -1133,6 +1047,14 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
return FALSE;
}
#ifdef HAVE_FSEEKO
#define FSEEK_FILE(file,offset) (fseeko (file, (off_t) offset, SEEK_SET) != 0)
#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
#define FSEEK_FILE(file,offset) (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
#else
#define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0)
#endif
static gint64
gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
guint8 * dst)
@ -1185,6 +1107,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
guint block_length, remaining, read_length;
gint64 read_return;
guint64 rb_size;
guint64 rpos;
/* allocate the output buffer of the requested size */
buf = gst_buffer_new_and_alloc (length);
@ -1193,20 +1116,21 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
offset);
rpos = queue->current->reading_pos = offset;
rb_size = queue->ring_buffer_max_size;
remaining = length;
while (remaining > 0) {
/* configure how much/whether to read */
if (!gst_queue2_have_data (queue, offset, length)) {
if (!gst_queue2_have_data (queue, rpos, remaining)) {
read_length = 0;
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
guint64 level;
/* calculate how far away the offset is */
if (queue->current->writing_pos > offset)
level = queue->current->writing_pos - offset;
if (queue->current->writing_pos > rpos)
level = queue->current->writing_pos - rpos;
else
level = 0;
@ -1224,7 +1148,14 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
read_length = rb_size;
}
}
if (read_length == 0) {
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
/* protect cached data (data between offset and max_reading_pos)
* and update current level */
queue->current->max_reading_pos = rpos;
update_cur_level (queue, queue->current);
}
GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
continue;
}
@ -1236,7 +1167,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
/* congfigure how much and from where to read */
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
file_offset =
(queue->current->rb_offset + (offset -
(queue->current->rb_offset + (rpos -
queue->current->offset)) % rb_size;
if (file_offset + read_length > rb_size) {
block_length = rb_size - file_offset;
@ -1244,7 +1175,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
block_length = read_length;
}
} else {
file_offset = offset;
file_offset = rpos;
block_length = read_length;
}
@ -1265,7 +1196,8 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
block_length = read_length;
remaining -= read_return;
queue->current->reading_pos += read_return;
rpos = (queue->current->reading_pos += read_return);
update_cur_pos (queue, queue->current, queue->current->reading_pos);
}
GST_QUEUE2_SIGNAL_DEL (queue);
GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
@ -1504,15 +1436,19 @@ out_flushing:
}
static gboolean
gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
{
guint8 *data;
guint size, rb_size;
guint64 writing_pos, new_writing_pos;
guint64 writing_pos, new_writing_pos, max_reading_pos;
gint64 space;
GstQueue2Range *range, *prev;
GstQueue2Range *range, *prev, *next;
writing_pos = queue->current->rb_writing_pos;
if (QUEUE_IS_USING_RING_BUFFER (queue))
writing_pos = queue->current->rb_writing_pos;
else
writing_pos = queue->current->writing_pos;
max_reading_pos = queue->current->max_reading_pos;
rb_size = queue->ring_buffer_max_size;
size = GST_BUFFER_SIZE (buffer);
@ -1521,46 +1457,77 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
while (size > 0) {
guint to_write;
/* calculate the space in the ring buffer not used by data from the
* current range */
while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
/* wait until there is some free space */
GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
}
/* get the amount of space we have */
space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
/* calculate the space in the ring buffer not used by data from
* the current range */
while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
/* wait until there is some free space */
GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
}
/* get the amount of space we have */
space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
/* calculate if we need to split or if we can write the entire buffer now */
to_write = MIN (size, space);
/* calculate if we need to split or if we can write the entire
* buffer now */
to_write = MIN (size, space);
/* the writing position in the ring buffer after writing (part or all of)
* the buffer */
new_writing_pos = (writing_pos + to_write) % rb_size;
/* the writing position in the ring buffer after writing (part
* or all of) the buffer */
new_writing_pos = (writing_pos + to_write) % rb_size;
prev = NULL;
range = queue->ranges;
debug_ranges (queue);
/* if we need to overwrite data in the ring buffer, we need to update the
* ranges
* warning: this code is complicated and includes some simplifications -
* pen, paper and diagrams for the cases recommended! */
while (range) {
guint64 range_data_start, range_data_end;
GstQueue2Range *range_to_destroy = NULL;
prev = NULL;
range = queue->ranges;
/* we don't edit the current range here */
if (range == queue->current)
goto next_range;
/* if we need to overwrite data in the ring buffer, we need to
* update the ranges
*
* warning: this code is complicated and includes some
* simplifications - pen, paper and diagrams for the cases
* recommended! */
while (range) {
guint64 range_data_start, range_data_end;
GstQueue2Range *range_to_destroy = NULL;
range_data_start = range->rb_offset;
range_data_end = range->rb_writing_pos;
if (range_data_end > range_data_start) {
if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
/* we don't edit the current range here */
if (range == queue->current)
goto next_range;
if (new_writing_pos > range_data_start) {
if (new_writing_pos >= range_data_end) {
range_data_start = range->rb_offset;
range_data_end = range->rb_writing_pos;
if (range_data_end > range_data_start) {
if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
goto next_range;
if (new_writing_pos > range_data_start) {
if (new_writing_pos >= range_data_end) {
GST_DEBUG_OBJECT (queue,
"Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
G_GUINT64_FORMAT, range->offset, range->writing_pos);
/* remove range */
range_to_destroy = range;
if (prev)
prev->next = range->next;
} else {
GST_DEBUG_OBJECT (queue,
"advancing offsets from %" G_GUINT64_FORMAT " (%"
G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
range->offset + new_writing_pos - range_data_start,
new_writing_pos);
range->offset += (new_writing_pos - range_data_start);
range->rb_offset = new_writing_pos;
}
}
} else {
guint64 new_wpos_virt = writing_pos + to_write;
if (new_wpos_virt <= range_data_start)
goto next_range;
if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
GST_DEBUG_OBJECT (queue,
"Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
G_GUINT64_FORMAT, range->offset, range->writing_pos);
@ -1575,47 +1542,28 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
range->offset + new_writing_pos - range_data_start,
new_writing_pos);
range->offset += (new_writing_pos - range_data_start);
range->offset += (new_wpos_virt - range_data_start);
range->rb_offset = new_writing_pos;
}
}
} else {
guint64 new_wpos_virt = writing_pos + to_write;
if (new_wpos_virt <= range_data_start)
goto next_range;
next_range:
if (!range_to_destroy)
prev = range;
if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
GST_DEBUG_OBJECT (queue,
"Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
G_GUINT64_FORMAT, range->offset, range->writing_pos);
/* remove range */
range_to_destroy = range;
if (prev)
prev->next = range->next;
} else {
GST_DEBUG_OBJECT (queue,
"advancing offsets from %" G_GUINT64_FORMAT " (%" G_GUINT64_FORMAT
") to %" G_GUINT64_FORMAT " (%" G_GUINT64_FORMAT ")",
range->offset, range->rb_offset,
range->offset + new_writing_pos - range_data_start,
new_writing_pos);
range->offset += (new_wpos_virt - range_data_start);
range->rb_offset = new_writing_pos;
range = range->next;
if (range_to_destroy) {
if (range_to_destroy == queue->ranges)
queue->ranges = range;
g_slice_free (GstQueue2Range, range_to_destroy);
range_to_destroy = NULL;
}
}
debug_ranges (queue);
next_range:
if (!range_to_destroy)
prev = range;
range = range->next;
if (range_to_destroy) {
if (range_to_destroy == queue->ranges)
queue->ranges = range;
g_slice_free (GstQueue2Range, range_to_destroy);
range_to_destroy = NULL;
}
} else {
space = to_write = size;
new_writing_pos = writing_pos + to_write;
}
if (FSEEK_FILE (queue->temp_file, writing_pos))
@ -1623,9 +1571,32 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
if (new_writing_pos > writing_pos) {
GST_INFO_OBJECT (queue, "writing %u bytes", to_write);
/* no wrapping, just write */
/* either not using ring buffer or no wrapping, just write */
if (fwrite (data, to_write, 1, queue->temp_file) != 1)
goto handle_error;
if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
/* try to merge with next range */
while ((next = queue->current->next)) {
GST_INFO_OBJECT (queue,
"checking merge with next range %" G_GUINT64_FORMAT " < %"
G_GUINT64_FORMAT, new_writing_pos, next->offset);
if (new_writing_pos < next->offset)
break;
GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
next->writing_pos);
/* we will run over the offset of the next group */
queue->current->writing_pos = new_writing_pos = next->writing_pos;
/* remove the group */
queue->current->next = next->next;
g_slice_free (GstQueue2Range, next);
debug_ranges (queue);
}
goto update_and_signal;
}
} else {
/* wrapping */
guint block_one, block_two;
@ -1650,17 +1621,22 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
}
}
update_and_signal:
/* update the writing positions */
size -= to_write;
data += to_write;
queue->current->writing_pos += to_write;
queue->current->rb_writing_pos = writing_pos = new_writing_pos;
update_cur_level (queue, queue->current);
GST_INFO_OBJECT (queue,
"wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
to_write, writing_pos, size);
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
data += to_write;
queue->current->writing_pos += to_write;
queue->current->rb_writing_pos = writing_pos = new_writing_pos;
} else {
queue->current->writing_pos = writing_pos = new_writing_pos;
}
update_cur_level (queue, queue->current);
GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
@ -1721,13 +1697,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
/* update the byterate stats */
update_in_rates (queue);
/* FIXME - check return values? */
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
gst_queue2_write_buffer_to_ring_buffer (queue, buffer);
} else if (QUEUE_IS_USING_TEMP_FILE (queue)) {
gst_queue2_write_buffer_to_file (queue, buffer);
}
/* FIXME - check return value? */
gst_queue2_write_buffer_to_file (queue, buffer);
} else if (GST_IS_EVENT (item)) {
GstEvent *event;