queue2: extend ring buffer to support ram mode

This commit is contained in:
Robert Swain 2010-06-18 17:43:40 +02:00 committed by Wim Taymans
parent e29cca10a4
commit 9df54eb4ff
2 changed files with 74 additions and 28 deletions

View file

@ -64,6 +64,8 @@
#include "gst/gst-i18n-lib.h"
#include <string.h>
#ifdef G_OS_WIN32
#include <io.h> /* lseek, open, close, read */
#undef lseek
@ -450,6 +452,7 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
queue->temp_remove = DEFAULT_TEMP_REMOVE;
queue->use_ring_buffer = FALSE;
queue->ring_buffer = NULL;
queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
GST_DEBUG_OBJECT (queue,
@ -1059,18 +1062,29 @@ static gint64
gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
guint8 * dst)
{
guint8 *ring_buffer;
size_t res;
if (FSEEK_FILE (queue->temp_file, offset))
ring_buffer = queue->ring_buffer;
if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
goto seek_failed;
/* this should not block */
GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
length, offset);
res = fread (dst, 1, length, queue->temp_file);
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
res = fread (dst, 1, length, queue->temp_file);
} else {
memcpy (dst, ring_buffer + offset, length);
res = length;
}
GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
if (G_UNLIKELY (res < length)) {
if (!QUEUE_IS_USING_TEMP_FILE (queue))
goto could_not_read;
/* check for errors or EOF */
if (ferror (queue->temp_file))
goto could_not_read;
@ -1307,8 +1321,6 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue)
}
GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
init_ranges (queue);
return TRUE;
/* ERRORS */
@ -1370,15 +1382,15 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
GST_DEBUG_OBJECT (queue, "flushing temp file");
queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
init_ranges (queue);
}
static void
gst_queue2_locked_flush (GstQueue2 * queue)
{
if (!QUEUE_IS_USING_QUEUE (queue)) {
gst_queue2_flush_temp_file (queue);
if (QUEUE_IS_USING_TEMP_FILE (queue))
gst_queue2_flush_temp_file (queue);
init_ranges (queue);
} else {
while (!g_queue_is_empty (queue->queue)) {
GstMiniObject *data = g_queue_pop_head (queue->queue);
@ -1436,9 +1448,9 @@ out_flushing:
}
static gboolean
gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
{
guint8 *data;
guint8 *data, *ring_buffer;
guint size, rb_size;
guint64 writing_pos, new_writing_pos, max_reading_pos;
gint64 space;
@ -1449,6 +1461,7 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
else
writing_pos = queue->current->writing_pos;
max_reading_pos = queue->current->max_reading_pos;
ring_buffer = queue->ring_buffer;
rb_size = queue->ring_buffer_max_size;
size = GST_BUFFER_SIZE (buffer);
@ -1475,8 +1488,6 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
* or all of) the buffer */
new_writing_pos = (writing_pos + to_write) % rb_size;
debug_ranges (queue);
prev = NULL;
range = queue->ranges;
@ -1559,21 +1570,24 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
range_to_destroy = NULL;
}
}
debug_ranges (queue);
} else {
space = to_write = size;
new_writing_pos = writing_pos + to_write;
}
if (FSEEK_FILE (queue->temp_file, writing_pos))
if (QUEUE_IS_USING_TEMP_FILE (queue)
&& FSEEK_FILE (queue->temp_file, writing_pos))
goto seek_failed;
if (new_writing_pos > writing_pos) {
GST_INFO_OBJECT (queue, "writing %u bytes", to_write);
/* either not using ring buffer or no wrapping, just write */
if (fwrite (data, to_write, 1, queue->temp_file) != 1)
goto handle_error;
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (fwrite (data, to_write, 1, queue->temp_file) != 1)
goto handle_error;
} else {
memcpy (ring_buffer + writing_pos, data, to_write);
}
if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
/* try to merge with next range */
@ -1607,17 +1621,25 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
if (block_one > 0) {
GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
/* write data to end of ring buffer */
if (fwrite (data, block_one, 1, queue->temp_file) != 1)
goto handle_error;
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (fwrite (data, block_one, 1, queue->temp_file) != 1)
goto handle_error;
} else {
memcpy (ring_buffer + writing_pos, data, block_one);
}
}
if (FSEEK_FILE (queue->temp_file, 0))
if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
goto seek_failed;
if (block_two > 0) {
GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
goto handle_error;
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
goto handle_error;
} else {
memcpy (ring_buffer, data + block_one, block_two);
}
}
}
@ -1698,7 +1720,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
update_in_rates (queue);
/* FIXME - check return value? */
gst_queue2_write_buffer_to_file (queue, buffer);
gst_queue2_create_write (queue, buffer);
} else if (GST_IS_EVENT (item)) {
GstEvent *event;
@ -2598,8 +2620,15 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
if (active) {
if (!QUEUE_IS_USING_QUEUE (queue)) {
/* open the temp file now */
result = gst_queue2_open_temp_location_file (queue);
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
/* open the temp file now */
result = gst_queue2_open_temp_location_file (queue);
} else if (!queue->ring_buffer) {
queue->ring_buffer = malloc (queue->ring_buffer_max_size);
result = !!queue->ring_buffer;
}
init_ranges (queue);
GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating pull mode");
@ -2646,8 +2675,18 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
if (!QUEUE_IS_USING_QUEUE (queue)) {
if (!gst_queue2_open_temp_location_file (queue))
ret = GST_STATE_CHANGE_FAILURE;
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (!gst_queue2_open_temp_location_file (queue))
ret = GST_STATE_CHANGE_FAILURE;
} else {
if (queue->ring_buffer) {
free (queue->ring_buffer);
queue->ring_buffer = NULL;
}
if (!(queue->ring_buffer = malloc (queue->ring_buffer_max_size)))
ret = GST_STATE_CHANGE_FAILURE;
}
init_ranges (queue);
}
queue->segment_event_received = FALSE;
queue->starting_segment = NULL;
@ -2670,8 +2709,14 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (!QUEUE_IS_USING_QUEUE (queue))
gst_queue2_close_temp_location_file (queue);
if (!QUEUE_IS_USING_QUEUE (queue)) {
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
gst_queue2_close_temp_location_file (queue);
} else if (queue->ring_buffer) {
free (queue->ring_buffer);
queue->ring_buffer = NULL;
}
}
if (queue->starting_segment != NULL) {
gst_event_unref (queue->starting_segment);
queue->starting_segment = NULL;

View file

@ -137,6 +137,7 @@ struct _GstQueue2
gboolean use_ring_buffer;
guint64 ring_buffer_max_size;
guint8 * ring_buffer;
};
struct _GstQueue2Class