From 500d6a9986806f366ba6fb038c7edf031e6186f4 Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Sun, 6 Jun 2010 09:30:48 +0200 Subject: [PATCH] queue2: Clean up and improve code --- plugins/elements/gstqueue2.c | 184 +++++++++++++++++++++-------------- 1 file changed, 111 insertions(+), 73 deletions(-) diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index ef47f1ac75..7606be48d1 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -364,7 +364,7 @@ gst_queue2_class_init (GstQueue2Class * klass) */ g_object_class_install_property (gobject_class, PROP_USE_RING_BUFFER, g_param_spec_boolean ("use-ring-buffer", "Use a ring buffer", - "Use a ring buffer of size ring-buffer-max-size kB", + "Use a ring buffer of size ring-buffer-max-size bytes", DEFAULT_USE_RING_BUFFER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstQueue2:ring-buffer-max-size @@ -375,7 +375,8 @@ gst_queue2_class_init (GstQueue2Class * klass) * Since: 0.10.30 */ g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE, - g_param_spec_uint ("ring-buffer-max-size", "Max. ring buffer size (kB)", + g_param_spec_uint ("ring-buffer-max-size", + "Max. ring buffer size (bytes)", "Max. amount of data in the ring buffer (bytes, 0=unlimited)", DEFAULT_BUFFER_SIZE, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); @@ -754,8 +755,7 @@ update_buffering (GstQueue2 * queue) if (!queue->use_buffering || queue->high_percent <= 0) return; -#define GET_PERCENT(format) ((queue->max_level.format) > 0 ? \ - (queue->cur_level.format) * 100 / (queue->max_level.format) : 0) +#define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0) if (queue->is_eos) { /* on EOS we are always 100% full, we set the var here so that it we can @@ -765,19 +765,17 @@ update_buffering (GstQueue2 * queue) /* figure out the percent we are filled, we take the max of all formats. */ if (!QUEUE_IS_USING_RING_BUFFER (queue)) { - percent = GET_PERCENT (bytes); + percent = GET_PERCENT (bytes, 0); } else { - guint max_bytes = queue->max_level.bytes; guint64 rb_size = queue->ring_buffer_max_size; - max_bytes = max_bytes ? MIN (max_bytes, rb_size) : rb_size; - percent = max_bytes ? queue->cur_level.bytes * 100 / max_bytes : 0; + percent = GET_PERCENT (bytes, rb_size); } - percent = MAX (percent, GET_PERCENT (time)); - percent = MAX (percent, GET_PERCENT (buffers)); + percent = MAX (percent, GET_PERCENT (time, 0)); + percent = MAX (percent, GET_PERCENT (buffers, 0)); /* also apply the rate estimate when we need to */ if (queue->use_rate_estimate) - percent = MAX (percent, GET_PERCENT (rate_time)); + percent = MAX (percent, GET_PERCENT (rate_time, 0)); } if (queue->is_buffering) { @@ -1099,7 +1097,7 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %" G_GUINT64_FORMAT, range, range->offset, range->writing_pos); - if (queue->is_eos) + if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos) return TRUE; if (offset + length <= range->writing_pos) @@ -1107,7 +1105,7 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) else GST_DEBUG_OBJECT (queue, "Need more data (%" G_GUINT64_FORMAT " bytes more)", - range->writing_pos - (offset + length)); + (offset + length) - range->writing_pos); } else { GST_INFO_OBJECT (queue, "not found in any range"); @@ -1122,8 +1120,8 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) } else if (offset < queue->current->writing_pos + 200000) { update_cur_pos (queue, queue->current, offset + length); GST_INFO_OBJECT (queue, "wait for data"); - return FALSE; } + return FALSE; } /* too far away, do a seek */ @@ -1133,7 +1131,7 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) return FALSE; } -static GstFlowReturn +static gint64 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length, guint8 * dst) { @@ -1165,7 +1163,7 @@ gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length, goto eos; } - return GST_FLOW_OK; + return res; seek_failed: { @@ -1188,53 +1186,81 @@ static GstFlowReturn gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, GstBuffer ** buffer) { - GstFlowReturn flow_ret; GstBuffer *buf; guint8 *data; guint64 file_offset; - guint block_length; - - /* check if we have enough data at @offset. If there is not enough data, we - * block and wait. */ - while (!gst_queue2_have_data (queue, offset, length)) { - GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing); - } - - if (QUEUE_IS_USING_RING_BUFFER (queue)) { - file_offset = - (queue->current->rb_offset + (offset - - queue->current->offset)) % queue->ring_buffer_max_size; - if (file_offset + length > queue->ring_buffer_max_size) { - block_length = queue->ring_buffer_max_size - file_offset; - } else { - block_length = length; - } - } else { - file_offset = offset; - block_length = length; - } + guint block_length, remaining, read_length; + /* allocate the output buffer of the requested size */ buf = gst_buffer_new_and_alloc (length); data = GST_BUFFER_DATA (buf); - if ((flow_ret = - gst_queue2_read_data_at_offset (queue, file_offset, block_length, - data)) != GST_FLOW_OK) { - gst_buffer_unref (buf); - return flow_ret; - } + GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length, + offset); - if (block_length < length) { - /* read second block into a second buffer, then merge the two */ - data += block_length; - block_length = length - block_length; - - if ((flow_ret = - gst_queue2_read_data_at_offset (queue, 0, block_length, - data)) != GST_FLOW_OK) { - gst_buffer_unref (buf); - return flow_ret; + remaining = length; + while (remaining > 0) { + /* configure how much/whether to read */ + if (!gst_queue2_have_data (queue, offset, length)) { + if (QUEUE_IS_USING_RING_BUFFER (queue) + && queue->current->writing_pos - queue->current->reading_pos >= + queue->ring_buffer_max_size) { + /* we don't have the data but if we have a ring buffer that is full, we + * need to read */ + GST_DEBUG_OBJECT (queue, + "ring buffer full, reading ring-buffer-max-size %d bytes", + queue->ring_buffer_max_size); + read_length = queue->ring_buffer_max_size; + } else { + GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing); + continue; + } + } else { + /* we have the requested data so read it */ + read_length = remaining; } + + /* congfigure how much and from where to read */ + if (QUEUE_IS_USING_RING_BUFFER (queue)) { + file_offset = + (queue->current->rb_offset + (offset - + queue->current->offset)) % queue->ring_buffer_max_size; + if (file_offset + read_length > queue->ring_buffer_max_size) { + block_length = queue->ring_buffer_max_size - file_offset; + } else { + block_length = read_length; + } + } else { + file_offset = offset; + block_length = read_length; + } + + /* while we still have data to read, we loop */ + while (read_length > 0) { + gint64 read_return; + + read_return = + gst_queue2_read_data_at_offset (queue, file_offset, block_length, + data); + if (read_return < 0) { + gst_buffer_unref (buf); + return read_return; + } + + if (QUEUE_IS_USING_RING_BUFFER (queue)) { + file_offset = (file_offset + read_return) % queue->ring_buffer_max_size; + } else { + file_offset += read_return; + } + + data += read_return; + read_length -= read_return; + block_length = read_length; + remaining -= read_return; + queue->current->reading_pos += read_return; + } + GST_QUEUE2_SIGNAL_DEL (queue); + GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining); } GST_BUFFER_SIZE (buf) = length; @@ -1440,7 +1466,7 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) const guint rb_size = queue->ring_buffer_max_size; guint8 *data; guint64 writing_pos, reading_pos, new_writing_pos; - gint64 space; + gint64 space, rb_space; GstQueue2Range *range, *prev; writing_pos = queue->current->rb_writing_pos; @@ -1448,6 +1474,16 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) rem = buffer; + rb_space = + queue->ring_buffer_max_size - (queue->current->writing_pos - + queue->current->reading_pos); + while (rb_space <= 0) { + GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing); + rb_space = + queue->ring_buffer_max_size - (queue->current->writing_pos - + queue->current->reading_pos); + } + /* loop if we can't write the whole buffer at once */ do { /* calculate the space in the ring buffer not used by data from the @@ -1455,6 +1491,7 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) space = MIN (queue->max_level.bytes, queue->ring_buffer_max_size) - queue->cur_level.bytes; + space = MIN (space, rb_space); rem_size = GST_BUFFER_SIZE (rem); /* don't try to process 0 size buffers */ @@ -1596,6 +1633,8 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) queue->current->writing_pos += buf_size; queue->current->rb_writing_pos = writing_pos = new_writing_pos; + GST_QUEUE2_SIGNAL_ADD (queue); + update_cur_level (queue, queue->current); GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)", queue->cur_level.bytes, MIN (queue->max_level.bytes, @@ -1619,6 +1658,9 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing); } while (gst_queue2_is_filled (queue)); GST_DEBUG_OBJECT (queue, "flushed/space made"); + rb_space = + queue->ring_buffer_max_size - (queue->current->writing_pos - + queue->current->reading_pos); /* and continue if we were running before */ if (started) @@ -1737,7 +1779,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item) else gst_mini_object_unref (GST_MINI_OBJECT_CAST (item)); - GST_QUEUE2_SIGNAL_ADD (queue); + if (!QUEUE_IS_USING_RING_BUFFER (queue)) + GST_QUEUE2_SIGNAL_ADD (queue); } return; @@ -1808,13 +1851,14 @@ gst_queue2_locked_dequeue (GstQueue2 * queue) default: break; } + GST_QUEUE2_SIGNAL_DEL (queue); } else { g_warning ("Unexpected item %p dequeued from queue %s (refcounting problem?)", item, GST_OBJECT_NAME (queue)); item = NULL; + GST_QUEUE2_SIGNAL_DEL (queue); } - GST_QUEUE2_SIGNAL_DEL (queue); return item; @@ -1837,7 +1881,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) case GST_EVENT_FLUSH_START: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event"); - if (QUEUE_IS_USING_RING_BUFFER (queue) + if (!QUEUE_IS_USING_RING_BUFFER (queue) || !QUEUE_IS_USING_TEMP_FILE (queue)) { /* forward event */ gst_pad_push_event (queue->srcpad, event); @@ -1862,7 +1906,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event"); - if (QUEUE_IS_USING_RING_BUFFER (queue) + if (!QUEUE_IS_USING_RING_BUFFER (queue) || !QUEUE_IS_USING_TEMP_FILE (queue)) { /* forward event */ gst_pad_push_event (queue->srcpad, event); @@ -1949,6 +1993,8 @@ gst_queue2_is_filled (GstQueue2 * queue) if (queue->is_eos) return TRUE; +#define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && (queue->cur_level.format) >= ((alt_max) ? MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format))) + /* if using a ring buffer we're filled if all ring buffer space is used * _by the current range_ */ if (QUEUE_IS_USING_RING_BUFFER (queue)) { @@ -1957,8 +2003,7 @@ gst_queue2_is_filled (GstQueue2 * queue) GST_DEBUG_OBJECT (queue, "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u", max_bytes, rb_size, queue->cur_level.bytes); - return queue->cur_level.bytes >= (max_bytes ? MIN (max_bytes, - rb_size) : rb_size); + return CHECK_FILLED (bytes, rb_size); } /* if using file, we're never filled if we don't have EOS */ @@ -1969,16 +2014,14 @@ gst_queue2_is_filled (GstQueue2 * queue) if (queue->cur_level.buffers == 0) return FALSE; -#define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \ - (queue->cur_level.format) >= (queue->max_level.format)) - /* we are filled if one of the current levels exceeds the max */ - res = CHECK_FILLED (buffers) || CHECK_FILLED (bytes) || CHECK_FILLED (time); + res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0) + || CHECK_FILLED (time, 0); /* if we need to, use the rate estimate to check against the max time we are * allowed to queue */ if (queue->use_rate_estimate) - res |= CHECK_FILLED (rate_time); + res |= CHECK_FILLED (rate_time, 0); #undef CHECK_FILLED return res; @@ -2471,16 +2514,11 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length, GstFlowReturn ret; queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad)); - if (length > queue->ring_buffer_max_size) { - GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, - (_("Buffer is too large to fit in ring buffer")), - ("(%u > %" G_GUINT64_FORMAT ")", length, queue->ring_buffer_max_size)); - return GST_FLOW_ERROR; - } GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); length = (length == -1) ? DEFAULT_BUFFER_SIZE : length; - offset = (offset == -1) ? queue->current->reading_pos : offset; + queue->current->reading_pos = offset = + (offset == -1) ? queue->current->reading_pos : offset; GST_DEBUG_OBJECT (queue, "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);