queue2: refactorings

Check and handle seek errors
Refactor the wait_free_space function.
This commit is contained in:
Wim Taymans 2010-06-15 12:37:33 +02:00
parent c847b981f4
commit 6339bd0bec

View file

@ -942,11 +942,11 @@ update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
}
#ifdef HAVE_FSEEKO
#define FSEEK_FILE(file, offset) (fseeko (file, (off_t) offset, SEEK_SET))
#define FSEEK_FILE(file,offset) (fseeko (file, (off_t) offset, SEEK_SET) != 0)
#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
#define FSEEK_FILE(file, offset) (lseek (fileno (file), (off_t) offset, SEEK_SET))
#define FSEEK_FILE(file,offset) (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
#else
#define FSEEK_FILE(file, offset) (fseek (file, offset, SEEK_SET))
#define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0)
#endif
static gboolean
@ -960,7 +960,8 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
writing_pos = queue->current->writing_pos;
max_reading_pos = queue->current->max_reading_pos;
FSEEK_FILE (queue->temp_file, writing_pos);
if (FSEEK_FILE (queue->temp_file, writing_pos))
goto seek_failed;
data = GST_BUFFER_DATA (buffer);
size = GST_BUFFER_SIZE (buffer);
@ -1005,6 +1006,11 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
return TRUE;
/* ERRORS */
seek_failed:
{
GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
handle_error:
{
switch (errno) {
@ -1124,17 +1130,8 @@ gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
{
size_t res;
#ifdef HAVE_FSEEKO
if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0)
if (FSEEK_FILE (queue->temp_file, offset))
goto seek_failed;
#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
if (lseek (fileno (queue->temp_file), (off_t) offset,
SEEK_SET) == (off_t) - 1)
goto seek_failed;
#else
if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0)
goto seek_failed;
#endif
/* this should not block */
GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
@ -1154,7 +1151,7 @@ gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
seek_failed:
{
GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
return GST_FLOW_ERROR;
}
could_not_read:
@ -1445,6 +1442,41 @@ gst_queue2_locked_flush (GstQueue2 * queue)
GST_QUEUE2_SIGNAL_DEL (queue);
}
static gboolean
gst_queue2_wait_free_space (GstQueue2 * queue)
{
/* We make space available if we're "full" according to whatever
* the user defined as "full". */
if (gst_queue2_is_filled (queue)) {
gboolean started;
/* pause the timer while we wait. The fact that we are waiting does not mean
* the byterate on the input pad is lower */
if ((started = queue->in_timer_started))
g_timer_stop (queue->in_timer);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"queue is full, waiting for free space");
do {
/* Wait for space to be available, we could be unlocked because of a flush. */
GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
}
while (gst_queue2_is_filled (queue));
/* and continue if we were running before */
if (started)
g_timer_continue (queue->in_timer);
}
return TRUE;
/* ERRORS */
out_flushing:
{
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
return FALSE;
}
}
static gboolean
gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
{
@ -1589,7 +1621,8 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
}
}
FSEEK_FILE (queue->temp_file, writing_pos);
if (FSEEK_FILE (queue->temp_file, writing_pos))
goto seek_failed;
if (new_writing_pos > writing_pos) {
/* no wrapping, just write */
@ -1606,7 +1639,8 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
if (fwrite (data, block_one, 1, queue->temp_file) != 1)
goto handle_error;
FSEEK_FILE (queue->temp_file, 0);
if (FSEEK_FILE (queue->temp_file, 0))
goto seek_failed;
data += block_one;
if (fwrite (data, block_two, 1, queue->temp_file) != 1)
@ -1630,28 +1664,14 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
/* if we have a remainder of the buffer data, wait until there's space to
* write before looping */
if (rem_size) {
gboolean started;
if (!gst_queue2_wait_free_space (queue))
goto out_flushing;
/* pause the timer while we wait. The fact that we are waiting does not mean
* the byterate on the input pad is lower */
if ((started = queue->in_timer_started))
g_timer_stop (queue->in_timer);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"queue is full, waiting for free space");
GST_DEBUG_OBJECT (queue, "ring buffer full, waiting for space");
do {
/* Wait for space to be available, we could be unlocked because of a flush. */
GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
} while (gst_queue2_is_filled (queue));
GST_DEBUG_OBJECT (queue, "flushed/space made");
rb_space =
queue->ring_buffer_max_size - (queue->current->writing_pos -
queue->current->reading_pos);
/* and continue if we were running before */
if (started)
g_timer_continue (queue->in_timer);
}
} while (rem_size);
@ -1664,6 +1684,11 @@ out_flushing:
/* FIXME - GST_FLOW_UNEXPECTED ? */
return FALSE;
}
seek_failed:
{
GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
handle_error:
{
switch (errno) {
@ -1980,7 +2005,9 @@ gst_queue2_is_filled (GstQueue2 * queue)
if (queue->is_eos)
return TRUE;
#define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && (queue->cur_level.format) >= ((alt_max) ? MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
#define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
(queue->cur_level.format) >= ((alt_max) ? \
MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
/* if using a ring buffer we're filled if all ring buffer space is used
* _by the current range_ */
@ -2036,28 +2063,8 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
if (queue->unexpected)
goto out_unexpected;
/* We make space available if we're "full" according to whatever
* the user defined as "full". */
if (gst_queue2_is_filled (queue)) {
gboolean started;
/* pause the timer while we wait. The fact that we are waiting does not mean
* the byterate on the input pad is lower */
if ((started = queue->in_timer_started))
g_timer_stop (queue->in_timer);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"queue is full, waiting for free space");
do {
/* Wait for space to be available, we could be unlocked because of a flush. */
GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
}
while (gst_queue2_is_filled (queue));
/* and continue if we were running before */
if (started)
g_timer_continue (queue->in_timer);
}
if (!gst_queue2_wait_free_space (queue))
goto out_flushing;
/* put buffer in queue now */
gst_queue2_locked_enqueue (queue, buffer);