queue2; cleanups and fixes

Make a macro for some frequent checks
Emit the removed signal in all cases when we remove something
This commit is contained in:
Wim Taymans 2010-06-15 16:12:02 +02:00
parent 6339bd0bec
commit 73e27fb017

View file

@ -97,6 +97,7 @@ enum
#define DEFAULT_BUFFER_SIZE 4096 #define DEFAULT_BUFFER_SIZE 4096
#define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL) #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
#define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->use_ring_buffer) /* for consistency with the above macro */ #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->use_ring_buffer) /* for consistency with the above macro */
#define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
/* default property values */ /* default property values */
#define DEFAULT_MAX_SIZE_BUFFERS 100 /* 100 buffers */ #define DEFAULT_MAX_SIZE_BUFFERS 100 /* 100 buffers */
@ -148,7 +149,7 @@ enum
queue->max_level.bytes, \ queue->max_level.bytes, \
queue->cur_level.time, \ queue->cur_level.time, \
queue->max_level.time, \ queue->max_level.time, \
(guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \ (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
queue->current->writing_pos - queue->current->max_reading_pos : \ queue->current->writing_pos - queue->current->max_reading_pos : \
queue->queue->length)) queue->queue->length))
@ -792,7 +793,7 @@ update_buffering (GstQueue2 * queue)
queue->buffering_percent = percent; queue->buffering_percent = percent;
if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) { if (!QUEUE_IS_USING_QUEUE (queue)) {
GstFormat fmt = GST_FORMAT_BYTES; GstFormat fmt = GST_FORMAT_BYTES;
gint64 duration; gint64 duration;
@ -1105,11 +1106,12 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
/* we don't have the range, see how far away we are, FIXME, find a good /* we don't have the range, see how far away we are, FIXME, find a good
* threshold based on the incomming rate. */ * threshold based on the incomming rate. */
if (!queue->is_eos && queue->current) { if (!queue->is_eos && queue->current) {
if (QUEUE_IS_USING_RING_BUFFER (queue) && (offset < queue->current->offset if (QUEUE_IS_USING_RING_BUFFER (queue)) {
|| offset > if (offset < queue->current->offset || offset >
queue->current->writing_pos + queue->max_level.bytes - queue->current->writing_pos + queue->max_level.bytes -
queue->cur_level.bytes)) { queue->cur_level.bytes) {
perform_seek_to_offset (queue, offset); perform_seek_to_offset (queue, offset);
}
} else if (offset < queue->current->writing_pos + 200000) { } else if (offset < queue->current->writing_pos + 200000) {
update_cur_pos (queue, queue->current, offset + length); update_cur_pos (queue, queue->current, offset + length);
GST_INFO_OBJECT (queue, "wait for data"); GST_INFO_OBJECT (queue, "wait for data");
@ -1174,6 +1176,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
guint8 *data; guint8 *data;
guint64 file_offset; guint64 file_offset;
guint block_length, remaining, read_length; guint block_length, remaining, read_length;
gint64 read_return;
/* allocate the output buffer of the requested size */ /* allocate the output buffer of the requested size */
buf = gst_buffer_new_and_alloc (length); buf = gst_buffer_new_and_alloc (length);
@ -1221,21 +1224,15 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
/* while we still have data to read, we loop */ /* while we still have data to read, we loop */
while (read_length > 0) { while (read_length > 0) {
gint64 read_return;
read_return = read_return =
gst_queue2_read_data_at_offset (queue, file_offset, block_length, gst_queue2_read_data_at_offset (queue, file_offset, block_length,
data); data);
if (read_return < 0) { if (read_return < 0)
gst_buffer_unref (buf); goto read_error;
return read_return;
}
if (QUEUE_IS_USING_RING_BUFFER (queue)) { file_offset += read_return;
file_offset = (file_offset + read_return) % queue->ring_buffer_max_size; if (QUEUE_IS_USING_RING_BUFFER (queue))
} else { file_offset %= queue->ring_buffer_max_size;
file_offset += read_return;
}
data += read_return; data += read_return;
read_length -= read_return; read_length -= read_return;
@ -1261,6 +1258,12 @@ out_flushing:
GST_DEBUG_OBJECT (queue, "we are flushing"); GST_DEBUG_OBJECT (queue, "we are flushing");
return GST_FLOW_WRONG_STATE; return GST_FLOW_WRONG_STATE;
} }
read_error:
{
GST_DEBUG_OBJECT (queue, "we have a read error");
gst_buffer_unref (buf);
return read_return;
}
} }
/* should be called with QUEUE_LOCK */ /* should be called with QUEUE_LOCK */
@ -1282,6 +1285,7 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
ret = ret =
gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE, gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
&buffer); &buffer);
switch (ret) { switch (ret) {
case GST_FLOW_OK: case GST_FLOW_OK:
item = GST_MINI_OBJECT_CAST (buffer); item = GST_MINI_OBJECT_CAST (buffer);
@ -1419,7 +1423,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
static void static void
gst_queue2_locked_flush (GstQueue2 * queue) gst_queue2_locked_flush (GstQueue2 * queue)
{ {
if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) { if (!QUEUE_IS_USING_QUEUE (queue)) {
gst_queue2_flush_temp_file (queue); gst_queue2_flush_temp_file (queue);
} else { } else {
while (!g_queue_is_empty (queue->queue)) { while (!g_queue_is_empty (queue->queue)) {
@ -1493,15 +1497,16 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
rem = buffer; rem = buffer;
rb_space = do {
queue->ring_buffer_max_size - (queue->current->writing_pos -
queue->current->reading_pos);
while (rb_space <= 0) {
GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
rb_space = rb_space =
queue->ring_buffer_max_size - (queue->current->writing_pos - queue->ring_buffer_max_size - (queue->current->writing_pos -
queue->current->reading_pos); queue->current->reading_pos);
}
if (rb_space > 0)
break;
GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
} while (TRUE);
/* loop if we can't write the whole buffer at once */ /* loop if we can't write the whole buffer at once */
do { do {
@ -1718,8 +1723,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
size = GST_BUFFER_SIZE (buffer); size = GST_BUFFER_SIZE (buffer);
/* add buffer to the statistics */ /* add buffer to the statistics */
if (!(QUEUE_IS_USING_TEMP_FILE (queue) if (QUEUE_IS_USING_QUEUE (queue)) {
|| QUEUE_IS_USING_RING_BUFFER (queue))) {
queue->cur_level.buffers++; queue->cur_level.buffers++;
queue->cur_level.bytes += size; queue->cur_level.bytes += size;
} }
@ -1753,8 +1757,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
apply_segment (queue, event, &queue->sink_segment); apply_segment (queue, event, &queue->sink_segment);
/* This is our first new segment, we hold it /* This is our first new segment, we hold it
* as we can't save it on the temp file */ * as we can't save it on the temp file */
if (QUEUE_IS_USING_RING_BUFFER (queue) if (!QUEUE_IS_USING_QUEUE (queue)) {
|| QUEUE_IS_USING_TEMP_FILE (queue)) {
if (queue->segment_event_received) if (queue->segment_event_received)
goto unexpected_event; goto unexpected_event;
@ -1769,8 +1772,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
queue->unexpected = FALSE; queue->unexpected = FALSE;
break; break;
default: default:
if (QUEUE_IS_USING_RING_BUFFER (queue) if (!QUEUE_IS_USING_QUEUE (queue))
|| QUEUE_IS_USING_TEMP_FILE (queue))
goto unexpected_event; goto unexpected_event;
break; break;
} }
@ -1785,8 +1787,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
/* update the buffering status */ /* update the buffering status */
update_buffering (queue); update_buffering (queue);
if (!(QUEUE_IS_USING_TEMP_FILE (queue) if (QUEUE_IS_USING_QUEUE (queue))
|| QUEUE_IS_USING_RING_BUFFER (queue)))
g_queue_push_tail (queue->queue, item); g_queue_push_tail (queue->queue, item);
else else
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item)); gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
@ -1815,7 +1816,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
{ {
GstMiniObject *item; GstMiniObject *item;
if (QUEUE_IS_USING_TEMP_FILE (queue) || QUEUE_IS_USING_RING_BUFFER (queue)) if (!QUEUE_IS_USING_QUEUE (queue))
item = gst_queue2_read_item_from_file (queue); item = gst_queue2_read_item_from_file (queue);
else else
item = g_queue_pop_head (queue->queue); item = g_queue_pop_head (queue->queue);
@ -1833,8 +1834,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
GST_CAT_LOG_OBJECT (queue_dataflow, queue, GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved buffer %p from queue", buffer); "retrieved buffer %p from queue", buffer);
if (!(QUEUE_IS_USING_TEMP_FILE (queue) if (QUEUE_IS_USING_QUEUE (queue)) {
|| QUEUE_IS_USING_RING_BUFFER (queue))) {
queue->cur_level.buffers--; queue->cur_level.buffers--;
queue->cur_level.bytes -= size; queue->cur_level.bytes -= size;
} }
@ -1863,14 +1863,13 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
default: default:
break; break;
} }
GST_QUEUE2_SIGNAL_DEL (queue);
} else { } else {
g_warning g_warning
("Unexpected item %p dequeued from queue %s (refcounting problem?)", ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
item, GST_OBJECT_NAME (queue)); item, GST_OBJECT_NAME (queue));
item = NULL; item = NULL;
GST_QUEUE2_SIGNAL_DEL (queue);
} }
GST_QUEUE2_SIGNAL_DEL (queue);
return item; return item;
@ -1893,8 +1892,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
{ {
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event"); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
if (!QUEUE_IS_USING_RING_BUFFER (queue) if (QUEUE_IS_USING_QUEUE (queue)) {
|| !QUEUE_IS_USING_TEMP_FILE (queue)) {
/* forward event */ /* forward event */
gst_pad_push_event (queue->srcpad, event); gst_pad_push_event (queue->srcpad, event);
@ -1918,8 +1916,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
{ {
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event"); GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
if (!QUEUE_IS_USING_RING_BUFFER (queue) if (QUEUE_IS_USING_QUEUE (queue)) {
|| !QUEUE_IS_USING_TEMP_FILE (queue)) {
/* forward event */ /* forward event */
gst_pad_push_event (queue->srcpad, event); gst_pad_push_event (queue->srcpad, event);
@ -1985,8 +1982,7 @@ gst_queue2_is_empty (GstQueue2 * queue)
if (queue->is_eos) if (queue->is_eos)
return FALSE; return FALSE;
if ((QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
&& queue->current) {
return queue->current->writing_pos <= queue->current->max_reading_pos; return queue->current->writing_pos <= queue->current->max_reading_pos;
} else { } else {
if (queue->queue->length == 0) if (queue->queue->length == 0)
@ -2282,8 +2278,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
if (QUEUE_IS_USING_RING_BUFFER (queue) if (QUEUE_IS_USING_QUEUE (queue)) {
|| !QUEUE_IS_USING_TEMP_FILE (queue)) {
/* just forward upstream */ /* just forward upstream */
res = gst_pad_push_event (queue->sinkpad, event); res = gst_pad_push_event (queue->sinkpad, event);
} else { } else {
@ -2300,8 +2295,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
} }
break; break;
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
if (QUEUE_IS_USING_RING_BUFFER (queue) if (QUEUE_IS_USING_QUEUE (queue)) {
|| !QUEUE_IS_USING_TEMP_FILE (queue)) {
/* just forward upstream */ /* just forward upstream */
res = gst_pad_push_event (queue->sinkpad, event); res = gst_pad_push_event (queue->sinkpad, event);
} else { } else {
@ -2391,8 +2385,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
GST_DEBUG_OBJECT (queue, "query buffering"); GST_DEBUG_OBJECT (queue, "query buffering");
/* FIXME - is this condition correct? what should ring buffer do? */ /* FIXME - is this condition correct? what should ring buffer do? */
if (!(QUEUE_IS_USING_RING_BUFFER (queue) if (QUEUE_IS_USING_QUEUE (queue)) {
|| QUEUE_IS_USING_TEMP_FILE (queue))) {
/* no temp file, just forward to the peer */ /* no temp file, just forward to the peer */
if (!gst_queue2_peer_query (queue, queue->sinkpad, query)) if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
goto peer_failed; goto peer_failed;
@ -2546,7 +2539,7 @@ gst_queue2_src_checkgetrange_function (GstPad * pad)
queue = GST_QUEUE2 (gst_pad_get_parent (pad)); queue = GST_QUEUE2 (gst_pad_get_parent (pad));
/* we can operate in pull mode when we are using a tempfile */ /* we can operate in pull mode when we are using a tempfile */
ret = QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue); ret = !QUEUE_IS_USING_QUEUE (queue);
gst_object_unref (GST_OBJECT (queue)); gst_object_unref (GST_OBJECT (queue));
@ -2634,7 +2627,7 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
queue = GST_QUEUE2 (gst_pad_get_parent (pad)); queue = GST_QUEUE2 (gst_pad_get_parent (pad));
if (active) { if (active) {
if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) { if (!QUEUE_IS_USING_QUEUE (queue)) {
/* open the temp file now */ /* open the temp file now */
result = gst_queue2_open_temp_location_file (queue); result = gst_queue2_open_temp_location_file (queue);
@ -2682,8 +2675,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_NULL_TO_READY: case GST_STATE_CHANGE_NULL_TO_READY:
break; break;
case GST_STATE_CHANGE_READY_TO_PAUSED: case GST_STATE_CHANGE_READY_TO_PAUSED:
if (QUEUE_IS_USING_RING_BUFFER (queue) if (!QUEUE_IS_USING_QUEUE (queue)) {
|| QUEUE_IS_USING_TEMP_FILE (queue)) {
if (!gst_queue2_open_temp_location_file (queue)) if (!gst_queue2_open_temp_location_file (queue))
ret = GST_STATE_CHANGE_FAILURE; ret = GST_STATE_CHANGE_FAILURE;
} }
@ -2708,8 +2700,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_PLAYING_TO_PAUSED: case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break; break;
case GST_STATE_CHANGE_PAUSED_TO_READY: case GST_STATE_CHANGE_PAUSED_TO_READY:
if (QUEUE_IS_USING_RING_BUFFER (queue) if (!QUEUE_IS_USING_QUEUE (queue))
|| QUEUE_IS_USING_TEMP_FILE (queue))
gst_queue2_close_temp_location_file (queue); gst_queue2_close_temp_location_file (queue);
if (queue->starting_segment != NULL) { if (queue->starting_segment != NULL) {
gst_event_unref (queue->starting_segment); gst_event_unref (queue->starting_segment);