queue2: Clean up and improve code

This commit is contained in:
Robert Swain 2010-06-06 09:30:48 +02:00 committed by Wim Taymans
parent bde816451e
commit 500d6a9986

View file

@ -364,7 +364,7 @@ gst_queue2_class_init (GstQueue2Class * klass)
*/
g_object_class_install_property (gobject_class, PROP_USE_RING_BUFFER,
g_param_spec_boolean ("use-ring-buffer", "Use a ring buffer",
"Use a ring buffer of size ring-buffer-max-size kB",
"Use a ring buffer of size ring-buffer-max-size bytes",
DEFAULT_USE_RING_BUFFER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstQueue2:ring-buffer-max-size
@ -375,7 +375,8 @@ gst_queue2_class_init (GstQueue2Class * klass)
* Since: 0.10.30
*/
g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
g_param_spec_uint ("ring-buffer-max-size", "Max. ring buffer size (kB)",
g_param_spec_uint ("ring-buffer-max-size",
"Max. ring buffer size (bytes)",
"Max. amount of data in the ring buffer (bytes, 0=unlimited)",
DEFAULT_BUFFER_SIZE, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
@ -754,8 +755,7 @@ update_buffering (GstQueue2 * queue)
if (!queue->use_buffering || queue->high_percent <= 0)
return;
#define GET_PERCENT(format) ((queue->max_level.format) > 0 ? \
(queue->cur_level.format) * 100 / (queue->max_level.format) : 0)
#define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
if (queue->is_eos) {
/* on EOS we are always 100% full, we set the var here so that it we can
@ -765,19 +765,17 @@ update_buffering (GstQueue2 * queue)
/* figure out the percent we are filled, we take the max of all formats. */
if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
percent = GET_PERCENT (bytes);
percent = GET_PERCENT (bytes, 0);
} else {
guint max_bytes = queue->max_level.bytes;
guint64 rb_size = queue->ring_buffer_max_size;
max_bytes = max_bytes ? MIN (max_bytes, rb_size) : rb_size;
percent = max_bytes ? queue->cur_level.bytes * 100 / max_bytes : 0;
percent = GET_PERCENT (bytes, rb_size);
}
percent = MAX (percent, GET_PERCENT (time));
percent = MAX (percent, GET_PERCENT (buffers));
percent = MAX (percent, GET_PERCENT (time, 0));
percent = MAX (percent, GET_PERCENT (buffers, 0));
/* also apply the rate estimate when we need to */
if (queue->use_rate_estimate)
percent = MAX (percent, GET_PERCENT (rate_time));
percent = MAX (percent, GET_PERCENT (rate_time, 0));
}
if (queue->is_buffering) {
@ -1099,7 +1097,7 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
"we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
if (queue->is_eos)
if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
return TRUE;
if (offset + length <= range->writing_pos)
@ -1107,7 +1105,7 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
else
GST_DEBUG_OBJECT (queue,
"Need more data (%" G_GUINT64_FORMAT " bytes more)",
range->writing_pos - (offset + length));
(offset + length) - range->writing_pos);
} else {
GST_INFO_OBJECT (queue, "not found in any range");
@ -1122,8 +1120,8 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
} else if (offset < queue->current->writing_pos + 200000) {
update_cur_pos (queue, queue->current, offset + length);
GST_INFO_OBJECT (queue, "wait for data");
return FALSE;
}
return FALSE;
}
/* too far away, do a seek */
@ -1133,7 +1131,7 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
return FALSE;
}
static GstFlowReturn
static gint64
gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
guint8 * dst)
{
@ -1165,7 +1163,7 @@ gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
goto eos;
}
return GST_FLOW_OK;
return res;
seek_failed:
{
@ -1188,53 +1186,81 @@ static GstFlowReturn
gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
GstBuffer ** buffer)
{
GstFlowReturn flow_ret;
GstBuffer *buf;
guint8 *data;
guint64 file_offset;
guint block_length;
/* check if we have enough data at @offset. If there is not enough data, we
* block and wait. */
while (!gst_queue2_have_data (queue, offset, length)) {
GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
}
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
file_offset =
(queue->current->rb_offset + (offset -
queue->current->offset)) % queue->ring_buffer_max_size;
if (file_offset + length > queue->ring_buffer_max_size) {
block_length = queue->ring_buffer_max_size - file_offset;
} else {
block_length = length;
}
} else {
file_offset = offset;
block_length = length;
}
guint block_length, remaining, read_length;
/* allocate the output buffer of the requested size */
buf = gst_buffer_new_and_alloc (length);
data = GST_BUFFER_DATA (buf);
if ((flow_ret =
gst_queue2_read_data_at_offset (queue, file_offset, block_length,
data)) != GST_FLOW_OK) {
gst_buffer_unref (buf);
return flow_ret;
}
GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
offset);
if (block_length < length) {
/* read second block into a second buffer, then merge the two */
data += block_length;
block_length = length - block_length;
if ((flow_ret =
gst_queue2_read_data_at_offset (queue, 0, block_length,
data)) != GST_FLOW_OK) {
gst_buffer_unref (buf);
return flow_ret;
remaining = length;
while (remaining > 0) {
/* configure how much/whether to read */
if (!gst_queue2_have_data (queue, offset, length)) {
if (QUEUE_IS_USING_RING_BUFFER (queue)
&& queue->current->writing_pos - queue->current->reading_pos >=
queue->ring_buffer_max_size) {
/* we don't have the data but if we have a ring buffer that is full, we
* need to read */
GST_DEBUG_OBJECT (queue,
"ring buffer full, reading ring-buffer-max-size %d bytes",
queue->ring_buffer_max_size);
read_length = queue->ring_buffer_max_size;
} else {
GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
continue;
}
} else {
/* we have the requested data so read it */
read_length = remaining;
}
/* congfigure how much and from where to read */
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
file_offset =
(queue->current->rb_offset + (offset -
queue->current->offset)) % queue->ring_buffer_max_size;
if (file_offset + read_length > queue->ring_buffer_max_size) {
block_length = queue->ring_buffer_max_size - file_offset;
} else {
block_length = read_length;
}
} else {
file_offset = offset;
block_length = read_length;
}
/* while we still have data to read, we loop */
while (read_length > 0) {
gint64 read_return;
read_return =
gst_queue2_read_data_at_offset (queue, file_offset, block_length,
data);
if (read_return < 0) {
gst_buffer_unref (buf);
return read_return;
}
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
file_offset = (file_offset + read_return) % queue->ring_buffer_max_size;
} else {
file_offset += read_return;
}
data += read_return;
read_length -= read_return;
block_length = read_length;
remaining -= read_return;
queue->current->reading_pos += read_return;
}
GST_QUEUE2_SIGNAL_DEL (queue);
GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
}
GST_BUFFER_SIZE (buf) = length;
@ -1440,7 +1466,7 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
const guint rb_size = queue->ring_buffer_max_size;
guint8 *data;
guint64 writing_pos, reading_pos, new_writing_pos;
gint64 space;
gint64 space, rb_space;
GstQueue2Range *range, *prev;
writing_pos = queue->current->rb_writing_pos;
@ -1448,6 +1474,16 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
rem = buffer;
rb_space =
queue->ring_buffer_max_size - (queue->current->writing_pos -
queue->current->reading_pos);
while (rb_space <= 0) {
GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
rb_space =
queue->ring_buffer_max_size - (queue->current->writing_pos -
queue->current->reading_pos);
}
/* loop if we can't write the whole buffer at once */
do {
/* calculate the space in the ring buffer not used by data from the
@ -1455,6 +1491,7 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
space =
MIN (queue->max_level.bytes,
queue->ring_buffer_max_size) - queue->cur_level.bytes;
space = MIN (space, rb_space);
rem_size = GST_BUFFER_SIZE (rem);
/* don't try to process 0 size buffers */
@ -1596,6 +1633,8 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
queue->current->writing_pos += buf_size;
queue->current->rb_writing_pos = writing_pos = new_writing_pos;
GST_QUEUE2_SIGNAL_ADD (queue);
update_cur_level (queue, queue->current);
GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
queue->cur_level.bytes, MIN (queue->max_level.bytes,
@ -1619,6 +1658,9 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
} while (gst_queue2_is_filled (queue));
GST_DEBUG_OBJECT (queue, "flushed/space made");
rb_space =
queue->ring_buffer_max_size - (queue->current->writing_pos -
queue->current->reading_pos);
/* and continue if we were running before */
if (started)
@ -1737,7 +1779,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
else
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
GST_QUEUE2_SIGNAL_ADD (queue);
if (!QUEUE_IS_USING_RING_BUFFER (queue))
GST_QUEUE2_SIGNAL_ADD (queue);
}
return;
@ -1808,13 +1851,14 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
default:
break;
}
GST_QUEUE2_SIGNAL_DEL (queue);
} else {
g_warning
("Unexpected item %p dequeued from queue %s (refcounting problem?)",
item, GST_OBJECT_NAME (queue));
item = NULL;
GST_QUEUE2_SIGNAL_DEL (queue);
}
GST_QUEUE2_SIGNAL_DEL (queue);
return item;
@ -1837,7 +1881,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
case GST_EVENT_FLUSH_START:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
if (QUEUE_IS_USING_RING_BUFFER (queue)
if (!QUEUE_IS_USING_RING_BUFFER (queue)
|| !QUEUE_IS_USING_TEMP_FILE (queue)) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
@ -1862,7 +1906,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
if (QUEUE_IS_USING_RING_BUFFER (queue)
if (!QUEUE_IS_USING_RING_BUFFER (queue)
|| !QUEUE_IS_USING_TEMP_FILE (queue)) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
@ -1949,6 +1993,8 @@ gst_queue2_is_filled (GstQueue2 * queue)
if (queue->is_eos)
return TRUE;
#define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && (queue->cur_level.format) >= ((alt_max) ? MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
/* if using a ring buffer we're filled if all ring buffer space is used
* _by the current range_ */
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
@ -1957,8 +2003,7 @@ gst_queue2_is_filled (GstQueue2 * queue)
GST_DEBUG_OBJECT (queue,
"max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u", max_bytes,
rb_size, queue->cur_level.bytes);
return queue->cur_level.bytes >= (max_bytes ? MIN (max_bytes,
rb_size) : rb_size);
return CHECK_FILLED (bytes, rb_size);
}
/* if using file, we're never filled if we don't have EOS */
@ -1969,16 +2014,14 @@ gst_queue2_is_filled (GstQueue2 * queue)
if (queue->cur_level.buffers == 0)
return FALSE;
#define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \
(queue->cur_level.format) >= (queue->max_level.format))
/* we are filled if one of the current levels exceeds the max */
res = CHECK_FILLED (buffers) || CHECK_FILLED (bytes) || CHECK_FILLED (time);
res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
|| CHECK_FILLED (time, 0);
/* if we need to, use the rate estimate to check against the max time we are
* allowed to queue */
if (queue->use_rate_estimate)
res |= CHECK_FILLED (rate_time);
res |= CHECK_FILLED (rate_time, 0);
#undef CHECK_FILLED
return res;
@ -2471,16 +2514,11 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
GstFlowReturn ret;
queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
if (length > queue->ring_buffer_max_size) {
GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT,
(_("Buffer is too large to fit in ring buffer")),
("(%u > %" G_GUINT64_FORMAT ")", length, queue->ring_buffer_max_size));
return GST_FLOW_ERROR;
}
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
offset = (offset == -1) ? queue->current->reading_pos : offset;
queue->current->reading_pos = offset =
(offset == -1) ? queue->current->reading_pos : offset;
GST_DEBUG_OBJECT (queue,
"Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);