queue2: ring buffer work in progress

This commit is contained in:
Robert Swain 2010-05-07 09:30:44 +02:00 committed by Wim Taymans
parent f093707189
commit d1809558e5
2 changed files with 366 additions and 73 deletions

View file

@ -93,6 +93,11 @@ enum
LAST_SIGNAL
};
/* other defines */
#define DEFAULT_BUFFER_SIZE 4096
#define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
#define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->use_ring_buffer) /* for consistency with the above macro */
/* default property values */
#define DEFAULT_MAX_SIZE_BUFFERS 100 /* 100 buffers */
#define DEFAULT_MAX_SIZE_BYTES (2 * 1024 * 1024) /* 2 MB */
@ -103,11 +108,7 @@ enum
#define DEFAULT_HIGH_PERCENT 99
#define DEFAULT_TEMP_REMOVE TRUE
#define DEFAULT_USE_RING_BUFFER FALSE
#define DEFAULT_RING_BUFFER_MAX_SIZE (16 * 1024 * 1024) /* 16 MB */
/* other defines */
#define DEFAULT_BUFFER_SIZE 4096
#define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
#define DEFAULT_RING_BUFFER_MAX_SIZE (1024 * DEFAULT_BUFFER_SIZE) /* 4 MB */
enum
{
@ -376,7 +377,7 @@ gst_queue2_class_init (GstQueue2Class * klass)
g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
g_param_spec_uint ("ring-buffer-max-size", "Max. ring buffer size (kB)",
"Max. amount of data in the ring buffer (bytes, 0=unlimited)",
0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
DEFAULT_BUFFER_SIZE, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/* set several parent class virtual functions */
@ -458,6 +459,8 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
queue->temp_location_set = FALSE;
queue->temp_remove = DEFAULT_TEMP_REMOVE;
queue->use_ring_buffer = DEFAULT_USE_RING_BUFFER;
queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
GST_DEBUG_OBJECT (queue,
"initialized queue's not_empty & not_full conditions");
}
@ -549,8 +552,12 @@ add_range (GstQueue2 * queue, guint64 offset)
range = g_slice_new0 (GstQueue2Range);
range->offset = offset;
/* we want to write to the next location in the ring buffer */
range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
range->writing_pos = offset;
range->rb_writing_pos = range->rb_offset;
range->reading_pos = offset;
range->rb_reading_pos = range->rb_offset;
range->max_reading_pos = offset;
/* insert sorted */
@ -792,7 +799,7 @@ update_buffering (GstQueue2 * queue)
queue->buffering_percent = percent;
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
GstFormat fmt = GST_FORMAT_BYTES;
gint64 duration;
@ -941,6 +948,14 @@ update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
queue->cur_level.bytes = 0;
}
#ifdef HAVE_FSEEKO
#define FSEEK_FILE(file, offset) (fseeko (file, (off_t) offset, SEEK_SET))
#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
#define FSEEK_FILE(file, offset) (lseek (fileno (file), (off_t) offset, SEEK_SET))
#else
#define FSEEK_FILE(file, offset) (fseek (file, offset, SEEK_SET))
#endif
static gboolean
gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
{
@ -952,13 +967,7 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
writing_pos = queue->current->writing_pos;
max_reading_pos = queue->current->max_reading_pos;
#ifdef HAVE_FSEEKO
fseeko (queue->temp_file, (off_t) writing_pos, SEEK_SET);
#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
lseek (fileno (queue->temp_file), (off_t) writing_pos, SEEK_SET);
#else
fseek (queue->temp_file, writing_pos, SEEK_SET);
#endif
FSEEK_FILE (queue->temp_file, writing_pos);
data = GST_BUFFER_DATA (buffer);
size = GST_BUFFER_SIZE (buffer);
@ -1073,6 +1082,10 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
/* update the current reading position in the range */
update_cur_pos (queue, queue->current, offset + length);
GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
queue->cur_level.bytes, MIN (queue->max_level.bytes,
queue->ring_buffer_max_size));
/* we have a range for offset */
GST_DEBUG_OBJECT (queue,
"we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
@ -1104,17 +1117,10 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
}
static GstFlowReturn
gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
GstBuffer ** buffer)
gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
guint8 * dst)
{
size_t res;
GstBuffer *buf;
/* check if we have enough data at @offset. If there is not enough data, we
* block and wait. */
while (!gst_queue2_have_data (queue, offset, length)) {
GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
}
#ifdef HAVE_FSEEKO
if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0)
@ -1128,14 +1134,13 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
goto seek_failed;
#endif
buf = gst_buffer_new_and_alloc (length);
/* this should not block */
GST_LOG_OBJECT (queue, "Reading %d bytes", length);
res = fread (GST_BUFFER_DATA (buf), 1, length, queue->temp_file);
GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
length, offset);
res = fread (dst, 1, length, queue->temp_file);
GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
if (G_UNLIKELY (res == 0)) {
if (G_UNLIKELY (res < length)) {
/* check for errors or EOF */
if (ferror (queue->temp_file))
goto could_not_read;
@ -1143,7 +1148,77 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
goto eos;
}
length = res;
return GST_FLOW_OK;
seek_failed:
{
GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return GST_FLOW_ERROR;
}
could_not_read:
{
GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return GST_FLOW_ERROR;
}
eos:
{
GST_DEBUG ("non-regular file hits EOS");
return GST_FLOW_UNEXPECTED;
}
}
static GstFlowReturn
gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
GstBuffer ** buffer)
{
GstFlowReturn flow_ret;
GstBuffer *buf;
guint8 *data;
guint64 file_offset;
guint block_length;
/* check if we have enough data at @offset. If there is not enough data, we
* block and wait. */
while (!gst_queue2_have_data (queue, offset, length)) {
GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
}
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
file_offset =
(queue->current->rb_offset + (offset -
queue->current->offset)) % queue->ring_buffer_max_size;
if (file_offset + length > queue->ring_buffer_max_size) {
block_length = queue->ring_buffer_max_size - file_offset;
} else {
block_length = length;
}
} else {
file_offset = offset;
block_length = length;
}
buf = gst_buffer_new_and_alloc (length);
data = GST_BUFFER_DATA (buf);
if ((flow_ret =
gst_queue2_read_data_at_offset (queue, file_offset, block_length,
data)) != GST_FLOW_OK) {
gst_buffer_unref (buf);
return flow_ret;
}
if (block_length < length) {
/* read second block into a second buffer, then merge the two */
data += block_length;
block_length = length - block_length;
if ((flow_ret =
gst_queue2_read_data_at_offset (queue, 0, block_length,
data)) != GST_FLOW_OK) {
gst_buffer_unref (buf);
return flow_ret;
}
}
GST_BUFFER_SIZE (buf) = length;
GST_BUFFER_OFFSET (buf) = offset;
@ -1159,23 +1234,6 @@ out_flushing:
GST_DEBUG_OBJECT (queue, "we are flushing");
return GST_FLOW_WRONG_STATE;
}
seek_failed:
{
GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return GST_FLOW_ERROR;
}
could_not_read:
{
GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unref (buf);
return GST_FLOW_ERROR;
}
eos:
{
GST_DEBUG ("non-regular file hits EOS");
gst_buffer_unref (buf);
return GST_FLOW_UNEXPECTED;
}
}
/* should be called with QUEUE_LOCK */
@ -1200,6 +1258,11 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
switch (ret) {
case GST_FLOW_OK:
item = GST_MINI_OBJECT_CAST (buffer);
queue->current->reading_pos += DEFAULT_BUFFER_SIZE;
if (QUEUE_IS_USING_RING_BUFFER (queue))
queue->current->rb_reading_pos =
(queue->current->rb_reading_pos +
DEFAULT_BUFFER_SIZE) % queue->ring_buffer_max_size;
break;
case GST_FLOW_UNEXPECTED:
item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
@ -1329,7 +1392,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
static void
gst_queue2_locked_flush (GstQueue2 * queue)
{
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
gst_queue2_flush_temp_file (queue);
} else {
while (!g_queue_is_empty (queue->queue)) {
@ -1352,6 +1415,205 @@ gst_queue2_locked_flush (GstQueue2 * queue)
GST_QUEUE2_SIGNAL_DEL (queue);
}
static gboolean
gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
{
GstBuffer *buf, *rem;
guint buf_size, rem_size;
const guint rb_size = queue->ring_buffer_max_size;
guint8 *data;
guint64 writing_pos, reading_pos, new_writing_pos;
gint64 space;
GstQueue2Range *range, *prev;
writing_pos = queue->current->rb_writing_pos;
reading_pos = queue->current->rb_reading_pos;
rem = buffer;
/* loop if we can't write the whole buffer at once */
do {
/* calculate the space in the ring buffer not used by data from the
* current range */
space =
MIN (queue->max_level.bytes,
queue->ring_buffer_max_size) - queue->cur_level.bytes;
rem_size = GST_BUFFER_SIZE (rem);
/* don't try to process 0 size buffers */
if (!rem_size)
break;
/* calculate if we need to split or if we can write the entire buffer now */
if (rem_size > space) {
buf_size = space;
buf = gst_buffer_create_sub (rem, 0, space);
rem_size -= space;
rem = gst_buffer_create_sub (rem, space, rem_size);
space = 0;
} else {
buf_size = rem_size;
buf = rem;
rem_size = 0;
rem = NULL;
space -= buf_size;
}
data = GST_BUFFER_DATA (buf);
/* the writing position in the ring buffer after writing (part or all of)
* the buffer */
new_writing_pos = (writing_pos + buf_size) % rb_size;
prev = NULL;
range = queue->ranges;
/* if we need to overwrite data in the ring buffer, we need to update the
* ranges
* warning: this code is complicated and includes some simplifications -
* pen, paper and diagrams for the cases recommended! */
while (range) {
guint64 range_data_start, range_data_end;
GstQueue2Range *range_to_destroy = NULL;
/* we don't edit the current range here */
if (range == queue->current)
goto next_range;
range_data_start = range->rb_offset;
range_data_end = range->rb_writing_pos;
if (range_data_end > range_data_start) {
if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
goto next_range;
if (new_writing_pos > range_data_start) {
if (new_writing_pos >= range_data_end) {
/* remove range */
range_to_destroy = range;
if (prev)
prev->next = range->next;
} else {
range->offset += (new_writing_pos - range_data_start);
range->rb_offset = new_writing_pos;
}
}
} else {
guint64 new_wpos_virt = writing_pos + buf_size;
if (new_wpos_virt <= range_data_start)
goto next_range;
if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
/* remove range */
range_to_destroy = range;
if (prev)
prev->next = range->next;
} else {
range->offset += (new_wpos_virt - range_data_start);
range->rb_offset = new_writing_pos;
}
}
next_range:
if (!range_to_destroy)
prev = range;
range = range->next;
if (range_to_destroy) {
if (range_to_destroy == queue->ranges)
queue->ranges = range;
g_slice_free1 (sizeof (GstQueue2Range), range_to_destroy);
range_to_destroy = NULL;
}
}
FSEEK_FILE (queue->temp_file, writing_pos);
if (new_writing_pos > writing_pos) {
/* no wrapping, just write */
if (fwrite (data, buf_size, 1, queue->temp_file) != 1)
goto handle_error;
} else {
/* wrapping */
guint block_one, block_two;
block_one = rb_size - writing_pos;
block_two = buf_size - block_one;
/* write data to end of ring buffer */
if (fwrite (data, block_one, 1, queue->temp_file) != 1)
goto handle_error;
FSEEK_FILE (queue->temp_file, 0);
data += block_one;
if (fwrite (data, block_two, 1, queue->temp_file) != 1)
goto handle_error;
}
/* update the writing positions */
GST_INFO_OBJECT (queue, "wrote %u bytes to %" G_GUINT64_FORMAT, buf_size,
writing_pos);
queue->current->writing_pos += buf_size;
queue->current->rb_writing_pos = writing_pos = new_writing_pos;
update_cur_level (queue, queue->current);
GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
queue->cur_level.bytes, MIN (queue->max_level.bytes,
queue->ring_buffer_max_size));
/* if we have a remainder of the buffer data, wait until there's space to
* write before looping */
if (rem_size) {
gboolean started;
/* pause the timer while we wait. The fact that we are waiting does not mean
* the byterate on the input pad is lower */
if ((started = queue->in_timer_started))
g_timer_stop (queue->in_timer);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"queue is full, waiting for free space");
while (gst_queue2_is_filled (queue)) {
/* Wait for space to be available, we could be unlocked because of a flush. */
GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
}
/* and continue if we were running before */
if (started)
g_timer_continue (queue->in_timer);
}
} while (rem_size);
return TRUE;
/* ERRORS */
out_flushing:
{
GST_DEBUG_OBJECT (queue, "we are flushing");
/* FIXME - GST_FLOW_UNEXPECTED ? */
return FALSE;
}
handle_error:
{
switch (errno) {
case ENOSPC:{
GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
break;
}
default:{
GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
(_("Error while writing to download file.")),
("%s", g_strerror (errno)));
}
}
return FALSE;
}
}
/* enqueue an item an update the level stats */
static void
gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
@ -1364,7 +1626,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
size = GST_BUFFER_SIZE (buffer);
/* add buffer to the statistics */
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
if (!(QUEUE_IS_USING_TEMP_FILE (queue)
|| QUEUE_IS_USING_RING_BUFFER (queue))) {
queue->cur_level.buffers++;
queue->cur_level.bytes += size;
}
@ -1375,7 +1638,10 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
/* update the byterate stats */
update_in_rates (queue);
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
/* FIXME - check return values? */
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
gst_queue2_write_buffer_to_ring_buffer (queue, buffer);
} else if (QUEUE_IS_USING_TEMP_FILE (queue)) {
gst_queue2_write_buffer_to_file (queue, buffer);
}
@ -1395,7 +1661,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
apply_segment (queue, event, &queue->sink_segment);
/* This is our first new segment, we hold it
* as we can't save it on the temp file */
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (QUEUE_IS_USING_RING_BUFFER (queue)
|| QUEUE_IS_USING_TEMP_FILE (queue)) {
if (queue->segment_event_received)
goto unexpected_event;
@ -1410,7 +1677,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
queue->unexpected = FALSE;
break;
default:
if (QUEUE_IS_USING_TEMP_FILE (queue))
if (QUEUE_IS_USING_RING_BUFFER (queue)
|| QUEUE_IS_USING_TEMP_FILE (queue))
goto unexpected_event;
break;
}
@ -1425,7 +1693,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
/* update the buffering status */
update_buffering (queue);
if (!QUEUE_IS_USING_TEMP_FILE (queue))
if (!(QUEUE_IS_USING_TEMP_FILE (queue)
|| QUEUE_IS_USING_RING_BUFFER (queue)))
g_queue_push_tail (queue->queue, item);
else
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
@ -1453,7 +1722,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
{
GstMiniObject *item;
if (QUEUE_IS_USING_TEMP_FILE (queue))
if (QUEUE_IS_USING_TEMP_FILE (queue) || QUEUE_IS_USING_RING_BUFFER (queue))
item = gst_queue2_read_item_from_file (queue);
else
item = g_queue_pop_head (queue->queue);
@ -1471,7 +1740,8 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved buffer %p from queue", buffer);
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
if (!(QUEUE_IS_USING_TEMP_FILE (queue)
|| QUEUE_IS_USING_RING_BUFFER (queue))) {
queue->cur_level.buffers--;
queue->cur_level.bytes -= size;
}
@ -1529,7 +1799,8 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
case GST_EVENT_FLUSH_START:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
if (!(QUEUE_IS_USING_RING_BUFFER (queue)
|| QUEUE_IS_USING_TEMP_FILE (queue))) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
@ -1553,7 +1824,8 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
if (!(QUEUE_IS_USING_RING_BUFFER (queue)
|| QUEUE_IS_USING_TEMP_FILE (queue))) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
@ -1619,8 +1891,8 @@ gst_queue2_is_empty (GstQueue2 * queue)
if (queue->is_eos)
return FALSE;
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
return queue->current->writing_pos == queue->current->max_reading_pos;
if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
return queue->current->writing_pos <= queue->current->max_reading_pos;
} else {
if (queue->queue->length == 0)
return TRUE;
@ -1638,6 +1910,12 @@ gst_queue2_is_filled (GstQueue2 * queue)
if (queue->is_eos)
return TRUE;
/* if using a ring buffer we're filled if all ring buffer space is used
* _by the current range_ */
if (QUEUE_IS_USING_RING_BUFFER (queue))
return queue->cur_level.bytes >= MIN (queue->max_level.bytes,
queue->ring_buffer_max_size);
/* if using file, we're never filled if we don't have EOS */
if (QUEUE_IS_USING_TEMP_FILE (queue))
return FALSE;
@ -1922,7 +2200,8 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
if (!(QUEUE_IS_USING_RING_BUFFER (queue)
|| QUEUE_IS_USING_TEMP_FILE (queue))) {
/* just forward upstream */
res = gst_pad_push_event (queue->sinkpad, event);
} else {
@ -1939,7 +2218,8 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
}
break;
case GST_EVENT_FLUSH_STOP:
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
if (!(QUEUE_IS_USING_RING_BUFFER (queue)
|| QUEUE_IS_USING_TEMP_FILE (queue))) {
/* just forward upstream */
res = gst_pad_push_event (queue->sinkpad, event);
} else {
@ -2028,7 +2308,8 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
GST_DEBUG_OBJECT (queue, "query buffering");
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
if (!(QUEUE_IS_USING_RING_BUFFER (queue)
|| QUEUE_IS_USING_TEMP_FILE (queue))) {
/* no temp file, just forward to the peer */
if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
goto peer_failed;
@ -2084,8 +2365,9 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
if (!peer_res)
goto peer_failed;
GST_DEBUG_OBJECT (queue, "duration %" G_GINT64_FORMAT ", writing %"
G_GINT64_FORMAT, duration, writing_pos);
GST_DEBUG_OBJECT (queue,
"duration %" G_GINT64_FORMAT ", writing %" G_GINT64_FORMAT,
duration, writing_pos);
start = 0;
/* get our available data relative to the duration */
@ -2143,12 +2425,18 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
GstFlowReturn ret;
queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
if (length > queue->ring_buffer_max_size) {
GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT,
(_("Buffer is too large to fit in ring buffer")),
("(%u > %" G_GUINT64_FORMAT ")", length, queue->ring_buffer_max_size));
return GST_FLOW_ERROR;
}
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
offset = (offset == -1) ? queue->current->reading_pos : offset;
/* function will block when the range is not yet available */
/* FIXME - function will block when the range is not yet available */
ret = gst_queue2_create_read (queue, offset, length, buffer);
GST_QUEUE2_MUTEX_UNLOCK (queue);
@ -2176,7 +2464,7 @@ gst_queue2_src_checkgetrange_function (GstPad * pad)
queue = GST_QUEUE2 (gst_pad_get_parent (pad));
/* we can operate in pull mode when we are using a tempfile */
ret = QUEUE_IS_USING_TEMP_FILE (queue);
ret = QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue);
gst_object_unref (GST_OBJECT (queue));
@ -2264,7 +2552,7 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
queue = GST_QUEUE2 (gst_pad_get_parent (pad));
if (active) {
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
/* open the temp file now */
result = gst_queue2_open_temp_location_file (queue);
@ -2312,7 +2600,8 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (QUEUE_IS_USING_RING_BUFFER (queue)
|| QUEUE_IS_USING_TEMP_FILE (queue)) {
if (!gst_queue2_open_temp_location_file (queue))
ret = GST_STATE_CHANGE_FAILURE;
}
@ -2337,7 +2626,8 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (QUEUE_IS_USING_TEMP_FILE (queue))
if (QUEUE_IS_USING_RING_BUFFER (queue)
|| QUEUE_IS_USING_TEMP_FILE (queue))
gst_queue2_close_temp_location_file (queue);
if (queue->starting_segment != NULL) {
gst_event_unref (queue->starting_segment);

View file

@ -60,10 +60,13 @@ struct _GstQueue2Range
{
GstQueue2Range *next;
guint64 offset;
guint64 writing_pos;
guint64 reading_pos;
guint64 max_reading_pos;
guint64 offset; /* offset of range start in source */
guint64 rb_offset; /* offset of range start in ring buffer */
guint64 writing_pos; /* writing position in source */
guint64 rb_writing_pos; /* writing position in ring buffer */
guint64 reading_pos; /* reading position in source */
guint64 rb_reading_pos; /* reading position in ring buffer */
guint64 max_reading_pos; /* latest requested offset in source */
};
struct _GstQueue2
@ -134,7 +137,7 @@ struct _GstQueue2
GstEvent *starting_segment;
gboolean use_ring_buffer;
guint ring_buffer_max_size;
guint64 ring_buffer_max_size;
};
struct _GstQueue2Class