diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index d1c04dd3a3..07bd69d4bc 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -942,11 +942,11 @@ update_cur_level (GstQueue2 * queue, GstQueue2Range * range) } #ifdef HAVE_FSEEKO -#define FSEEK_FILE(file, offset) (fseeko (file, (off_t) offset, SEEK_SET)) +#define FSEEK_FILE(file,offset) (fseeko (file, (off_t) offset, SEEK_SET) != 0) #elif defined (G_OS_UNIX) || defined (G_OS_WIN32) -#define FSEEK_FILE(file, offset) (lseek (fileno (file), (off_t) offset, SEEK_SET)) +#define FSEEK_FILE(file,offset) (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1) #else -#define FSEEK_FILE(file, offset) (fseek (file, offset, SEEK_SET)) +#define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0) #endif static gboolean @@ -960,7 +960,8 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) writing_pos = queue->current->writing_pos; max_reading_pos = queue->current->max_reading_pos; - FSEEK_FILE (queue->temp_file, writing_pos); + if (FSEEK_FILE (queue->temp_file, writing_pos)) + goto seek_failed; data = GST_BUFFER_DATA (buffer); size = GST_BUFFER_SIZE (buffer); @@ -1005,6 +1006,11 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) return TRUE; /* ERRORS */ +seek_failed: + { + GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM); + return FALSE; + } handle_error: { switch (errno) { @@ -1124,17 +1130,8 @@ gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length, { size_t res; -#ifdef HAVE_FSEEKO - if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0) + if (FSEEK_FILE (queue->temp_file, offset)) goto seek_failed; -#elif defined (G_OS_UNIX) || defined (G_OS_WIN32) - if (lseek (fileno (queue->temp_file), (off_t) offset, - SEEK_SET) == (off_t) - 1) - goto seek_failed; -#else - if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0) - goto seek_failed; -#endif /* this should not block */ GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT, @@ -1154,7 +1151,7 @@ gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length, seek_failed: { - GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); + GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM); return GST_FLOW_ERROR; } could_not_read: @@ -1445,6 +1442,41 @@ gst_queue2_locked_flush (GstQueue2 * queue) GST_QUEUE2_SIGNAL_DEL (queue); } +static gboolean +gst_queue2_wait_free_space (GstQueue2 * queue) +{ + /* We make space available if we're "full" according to whatever + * the user defined as "full". */ + if (gst_queue2_is_filled (queue)) { + gboolean started; + + /* pause the timer while we wait. The fact that we are waiting does not mean + * the byterate on the input pad is lower */ + if ((started = queue->in_timer_started)) + g_timer_stop (queue->in_timer); + + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, + "queue is full, waiting for free space"); + do { + /* Wait for space to be available, we could be unlocked because of a flush. */ + GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing); + } + while (gst_queue2_is_filled (queue)); + + /* and continue if we were running before */ + if (started) + g_timer_continue (queue->in_timer); + } + return TRUE; + + /* ERRORS */ +out_flushing: + { + GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing"); + return FALSE; + } +} + static gboolean gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) { @@ -1589,7 +1621,8 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) } } - FSEEK_FILE (queue->temp_file, writing_pos); + if (FSEEK_FILE (queue->temp_file, writing_pos)) + goto seek_failed; if (new_writing_pos > writing_pos) { /* no wrapping, just write */ @@ -1606,7 +1639,8 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) if (fwrite (data, block_one, 1, queue->temp_file) != 1) goto handle_error; - FSEEK_FILE (queue->temp_file, 0); + if (FSEEK_FILE (queue->temp_file, 0)) + goto seek_failed; data += block_one; if (fwrite (data, block_two, 1, queue->temp_file) != 1) @@ -1630,28 +1664,14 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) /* if we have a remainder of the buffer data, wait until there's space to * write before looping */ if (rem_size) { - gboolean started; + if (!gst_queue2_wait_free_space (queue)) + goto out_flushing; - /* pause the timer while we wait. The fact that we are waiting does not mean - * the byterate on the input pad is lower */ - if ((started = queue->in_timer_started)) - g_timer_stop (queue->in_timer); - - GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, - "queue is full, waiting for free space"); - GST_DEBUG_OBJECT (queue, "ring buffer full, waiting for space"); - do { - /* Wait for space to be available, we could be unlocked because of a flush. */ - GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing); - } while (gst_queue2_is_filled (queue)); GST_DEBUG_OBJECT (queue, "flushed/space made"); + rb_space = queue->ring_buffer_max_size - (queue->current->writing_pos - queue->current->reading_pos); - - /* and continue if we were running before */ - if (started) - g_timer_continue (queue->in_timer); } } while (rem_size); @@ -1664,6 +1684,11 @@ out_flushing: /* FIXME - GST_FLOW_UNEXPECTED ? */ return FALSE; } +seek_failed: + { + GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM); + return FALSE; + } handle_error: { switch (errno) { @@ -1980,7 +2005,9 @@ gst_queue2_is_filled (GstQueue2 * queue) if (queue->is_eos) return TRUE; -#define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && (queue->cur_level.format) >= ((alt_max) ? MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format))) +#define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \ + (queue->cur_level.format) >= ((alt_max) ? \ + MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format))) /* if using a ring buffer we're filled if all ring buffer space is used * _by the current range_ */ @@ -2036,28 +2063,8 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer) if (queue->unexpected) goto out_unexpected; - /* We make space available if we're "full" according to whatever - * the user defined as "full". */ - if (gst_queue2_is_filled (queue)) { - gboolean started; - - /* pause the timer while we wait. The fact that we are waiting does not mean - * the byterate on the input pad is lower */ - if ((started = queue->in_timer_started)) - g_timer_stop (queue->in_timer); - - GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, - "queue is full, waiting for free space"); - do { - /* Wait for space to be available, we could be unlocked because of a flush. */ - GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing); - } - while (gst_queue2_is_filled (queue)); - - /* and continue if we were running before */ - if (started) - g_timer_continue (queue->in_timer); - } + if (!gst_queue2_wait_free_space (queue)) + goto out_flushing; /* put buffer in queue now */ gst_queue2_locked_enqueue (queue, buffer);