gst/playback/gstqueue2.c: Add support for filebased buffering. Fixes #441264.

Original commit message from CVS:
Based on patch by: Thiago Sousa Santos <thiagossantos at gmail dot com>
* gst/playback/gstqueue2.c: (gst_queue_class_init),
(gst_queue_init), (gst_queue_finalize),
(gst_queue_write_buffer_to_file), (gst_queue_have_data),
(gst_queue_create_read), (gst_queue_read_item_from_file),
(gst_queue_open_temp_location_file),
(gst_queue_close_temp_location_file), (gst_queue_locked_flush),
(gst_queue_locked_enqueue), (gst_queue_locked_dequeue),
(gst_queue_is_empty), (gst_queue_is_filled),
(gst_queue_change_state), (gst_queue_set_temp_location),
(gst_queue_set_property):
Add support for filebased buffering. Fixes #441264.
This commit is contained in:
Thiago Sousa Santos 2007-06-05 16:14:23 +00:00 committed by Wim Taymans
parent 3840b5a20f
commit 73e8934af9
2 changed files with 330 additions and 15 deletions

View file

@ -1,3 +1,19 @@
2007-06-05 Wim Taymans <wim@fluendo.com>
Based on patch by: Thiago Sousa Santos <thiagossantos at gmail dot com>
* gst/playback/gstqueue2.c: (gst_queue_class_init),
(gst_queue_init), (gst_queue_finalize),
(gst_queue_write_buffer_to_file), (gst_queue_have_data),
(gst_queue_create_read), (gst_queue_read_item_from_file),
(gst_queue_open_temp_location_file),
(gst_queue_close_temp_location_file), (gst_queue_locked_flush),
(gst_queue_locked_enqueue), (gst_queue_locked_dequeue),
(gst_queue_is_empty), (gst_queue_is_filled),
(gst_queue_change_state), (gst_queue_set_temp_location),
(gst_queue_set_property):
Add support for filebased buffering. Fixes #441264.
2007-06-05 Wim Taymans <wim@fluendo.com>
* gst/playback/gstdecodebin2.c: (gst_decode_bin_factory_filter),

View file

@ -1,9 +1,8 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
* 2003 Colin Walters <cwalters@gnome.org>
* 2005 Wim Taymans <wim@fluendo.com>
* 2007 Wim Taymans <wim@fluendo.com>
* 2000,2005,2007 Wim Taymans <wim@fluendo.com>
* 2007 Thiago Sousa Santos <thiagossantos at gmail dot com>
*
* gstqueue2.c:
*
@ -41,12 +40,20 @@
*
* The default queue size limits are 100 buffers, 2MB of data, or
* two seconds worth of data, whichever is reached first.
*
* If you set temp-location, the element will buffer data on the file
* specified by it. By using this, it will buffer the entire
* stream data on the file independently of the queue size limits, they
* will only be used for buffering statistics.
*
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <glib/gstdio.h>
#include <gst/gst.h>
#include <gst/gst-i18n-plugin.h>
@ -83,6 +90,10 @@ enum
#define DEFAULT_LOW_PERCENT 10
#define DEFAULT_HIGH_PERCENT 99
/* other defines */
#define DEFAULT_BUFFER_SIZE 4096
#define QUEUE_IS_USING_TEMP_FILE(queue) (queue->temp_location != NULL)
enum
{
PROP_0,
@ -176,6 +187,13 @@ struct _GstQueue
/* temp location stuff */
gchar *temp_location;
FILE *temp_file;
guint64 writing_pos;
guint64 reading_pos;
/* we need this to send the first new segment event of the stream
* because we can't save it on the file */
gboolean segment_event_received;
GstEvent *starting_segment;
};
struct _GstQueueClass
@ -195,7 +213,9 @@ struct _GstQueueClass
queue->max_level.bytes, \
queue->cur_level.time, \
queue->max_level.time, \
queue->queue->length)
QUEUE_IS_USING_TEMP_FILE(queue) ? \
queue->writing_pos - queue->reading_pos : \
queue->queue->length)
#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
g_mutex_lock (q->qlock); \
@ -371,7 +391,7 @@ gst_queue_class_init (GstQueueClass * klass)
g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
g_param_spec_string ("temp-location", "Temporary File Location",
"Location of a temporary file to store data in (unused)",
"Location of a temporary file to store data in",
NULL, G_PARAM_READWRITE));
gst_element_class_add_pad_template (gstelement_class,
@ -443,6 +463,10 @@ gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
queue->item_del = g_cond_new ();
queue->queue = g_queue_new ();
/* tempfile related */
queue->temp_location = NULL;
queue->temp_file = NULL;
GST_DEBUG_OBJECT (queue,
"initialized queue's not_empty & not_full conditions");
}
@ -460,12 +484,17 @@ gst_queue_finalize (GObject * object)
gst_mini_object_unref (data);
}
g_queue_free (queue->queue);
g_mutex_free (queue->qlock);
g_cond_free (queue->item_add);
g_cond_free (queue->item_del);
g_timer_destroy (queue->timer);
/* temp_file path cleanup */
if (queue->temp_location != NULL)
g_free (queue->temp_location);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -719,20 +748,209 @@ update_rates (GstQueue * queue)
GST_TIME_ARGS (queue->cur_level.rate_time));
}
static void
gst_queue_write_buffer_to_file (GstQueue * queue, GstBuffer * buffer)
{
guint size;
guint8 *data;
fseek (queue->temp_file, queue->writing_pos, SEEK_SET);
data = GST_BUFFER_DATA (buffer);
size = GST_BUFFER_SIZE (buffer);
fwrite (data, 1, size, queue->temp_file);
queue->writing_pos += size;
}
/* see if there is enough data in the file to read a full buffer */
static gboolean
gst_queue_have_data (GstQueue * queue, guint64 offset, guint length)
{
GST_DEBUG_OBJECT (queue,
"offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset,
length, queue->writing_pos);
if (queue->is_eos)
return TRUE;
if (offset + length < queue->writing_pos)
return TRUE;
return FALSE;
}
static GstFlowReturn
gst_queue_create_read (GstQueue * queue, guint64 offset, guint length,
GstBuffer ** buffer)
{
size_t res;
GstBuffer *buf;
off_t sres;
/* check if we have enough data at @offset. If there is not enough data, we
* block and wait. */
while (!gst_queue_have_data (queue, offset, length)) {
GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
}
sres = fseeko (queue->temp_file, offset, SEEK_SET);
if (G_UNLIKELY (sres < 0))
goto seek_failed;
buf = gst_buffer_new_and_alloc (length);
/* this should not block */
GST_LOG_OBJECT (queue, "Reading %d bytes", length);
res = fread (GST_BUFFER_DATA (buf), 1, length, queue->temp_file);
GST_LOG_OBJECT (queue, "read %d bytes", res);
if (G_UNLIKELY (res == 0)) {
/* check for errors or EOF */
if (ferror (queue->temp_file))
goto could_not_read;
if (feof (queue->temp_file) && length > 0)
goto eos;
}
length = res;
GST_BUFFER_SIZE (buf) = length;
GST_BUFFER_OFFSET (buf) = offset;
GST_BUFFER_OFFSET_END (buf) = offset + length;
*buffer = buf;
queue->reading_pos = offset + length;
return GST_FLOW_OK;
/* ERRORS */
out_flushing:
{
GST_DEBUG_OBJECT (queue, "we are flushing");
return GST_FLOW_WRONG_STATE;
}
seek_failed:
{
GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return GST_FLOW_ERROR;
}
could_not_read:
{
GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unref (buf);
return GST_FLOW_ERROR;
}
eos:
{
GST_DEBUG ("non-regular file hits EOS");
gst_buffer_unref (buf);
return GST_FLOW_UNEXPECTED;
}
}
static GstMiniObject *
gst_queue_read_item_from_file (GstQueue * queue)
{
GstMiniObject *item;
if (queue->starting_segment != NULL) {
item = GST_MINI_OBJECT_CAST (queue->starting_segment);
queue->starting_segment = NULL;
} else {
GstFlowReturn ret;
GstBuffer *buffer;
ret =
gst_queue_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE,
&buffer);
switch (ret) {
case GST_FLOW_OK:
item = GST_MINI_OBJECT_CAST (buffer);
break;
case GST_FLOW_UNEXPECTED:
item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
break;
default:
item = NULL;
break;
}
}
return item;
}
static gboolean
gst_queue_open_temp_location_file (GstQueue * queue)
{
/* nothing to do */
if (queue->temp_location == NULL)
goto no_filename;
/* open the file for update/writing */
queue->temp_file = g_fopen (queue->temp_location, "wb+");
/* error creating file */
if (queue->temp_file == NULL)
goto open_failed;
queue->writing_pos = 0;
queue->reading_pos = 0;
return TRUE;
/* ERRORS */
no_filename:
{
GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
(_("No file name specified.")), (NULL));
return FALSE;
}
open_failed:
{
GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
(_("Could not open file \"%s\" for reading."), queue->temp_location),
GST_ERROR_SYSTEM);
return FALSE;
}
}
static void
gst_queue_close_temp_location_file (GstQueue * queue)
{
/* nothing to do */
if (queue->temp_file == NULL)
return;
/* we don't remove the file so that the application can use it as a cache
* later on */
fflush (queue->temp_file);
fclose (queue->temp_file);
remove (queue->temp_location);
queue->temp_file = NULL;
}
static void
gst_queue_locked_flush (GstQueue * queue)
{
while (!g_queue_is_empty (queue->queue)) {
GstMiniObject *data = g_queue_pop_head (queue->queue);
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
gst_queue_close_temp_location_file (queue);
gst_queue_open_temp_location_file (queue);
} else {
while (!g_queue_is_empty (queue->queue)) {
GstMiniObject *data = g_queue_pop_head (queue->queue);
/* Then lose another reference because we are supposed to destroy that
data when flushing */
gst_mini_object_unref (data);
/* Then lose another reference because we are supposed to destroy that
data when flushing */
gst_mini_object_unref (data);
}
}
GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
queue->is_eos = FALSE;
if (queue->starting_segment != NULL)
gst_event_unref (queue->starting_segment);
queue->starting_segment = NULL;
queue->segment_event_received = FALSE;
/* we deleted a lot of something */
GST_QUEUE_SIGNAL_DEL (queue);
@ -760,6 +978,10 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
/* update the buffering status */
update_buffering (queue);
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
gst_queue_write_buffer_to_file (queue, buffer);
}
} else if (GST_IS_EVENT (item)) {
GstEvent *event;
@ -773,8 +995,19 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
break;
case GST_EVENT_NEWSEGMENT:
apply_segment (queue, event, &queue->sink_segment);
/* This is our first new segment, we hold it
* as we can't save it on the temp file */
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (queue->segment_event_received)
goto unexpected_event;
queue->segment_event_received = TRUE;
queue->starting_segment = event;
}
break;
default:
if (QUEUE_IS_USING_TEMP_FILE (queue))
goto unexpected_event;
break;
}
} else {
@ -784,9 +1017,22 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
item = NULL;
}
if (item)
if (!QUEUE_IS_USING_TEMP_FILE (queue) && item)
g_queue_push_tail (queue->queue, item);
GST_QUEUE_SIGNAL_ADD (queue);
return;
/* ERRORS */
unexpected_event:
{
g_warning
("Unexpected event of kind %s can't be added in temp file of queue %s ",
gst_event_type_get_name (GST_EVENT_TYPE (item)),
GST_OBJECT_NAME (queue));
gst_event_unref (GST_EVENT_CAST (item));
return;
}
}
/* dequeue an item from the queue and update level stats */
@ -795,7 +1041,11 @@ gst_queue_locked_dequeue (GstQueue * queue)
{
GstMiniObject *item;
item = g_queue_pop_head (queue->queue);
if (QUEUE_IS_USING_TEMP_FILE (queue))
item = gst_queue_read_item_from_file (queue);
else
item = g_queue_pop_head (queue->queue);
if (item == NULL)
goto no_item;
@ -927,8 +1177,12 @@ gst_queue_is_empty (GstQueue * queue)
if (queue->is_eos)
return FALSE;
if (queue->queue->length == 0)
return TRUE;
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
return queue->writing_pos == queue->reading_pos;
} else {
if (queue->queue->length == 0)
return TRUE;
}
return FALSE;
}
@ -942,6 +1196,10 @@ gst_queue_is_filled (GstQueue * queue)
if (queue->is_eos)
return TRUE;
/* if using file, we're never filled if we don't have EOS */
if (QUEUE_IS_USING_TEMP_FILE (queue))
return FALSE;
#define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \
(queue->cur_level.format) >= (queue->max_level.format))
@ -1221,6 +1479,12 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
if (!gst_queue_open_temp_location_file (queue))
ret = GST_STATE_CHANGE_FAILURE;
}
queue->segment_event_received = FALSE;
queue->starting_segment = NULL;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
@ -1234,6 +1498,12 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (QUEUE_IS_USING_TEMP_FILE (queue))
gst_queue_close_temp_location_file (queue);
if (queue->starting_segment != NULL) {
gst_event_unref (queue->starting_segment);
queue->starting_segment = NULL;
}
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
@ -1257,6 +1527,35 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
#define QUEUE_THRESHOLD_CHANGE(q)\
g_cond_signal (queue->item_add);
static gboolean
gst_queue_set_temp_location (GstQueue * queue, const gchar * location)
{
GstState state;
/* the element must be stopped in order to do this */
GST_OBJECT_LOCK (queue);
state = GST_STATE (queue);
if (state != GST_STATE_READY && state != GST_STATE_NULL)
goto wrong_state;
GST_OBJECT_UNLOCK (queue);
/* set new location */
g_free (queue->temp_location);
queue->temp_location = g_strdup (location);
g_object_notify (G_OBJECT (queue), "temp-location");
return TRUE;
/* ERROR */
wrong_state:
{
GST_DEBUG_OBJECT (queue, "setting temp-location in wrong state");
GST_OBJECT_UNLOCK (queue);
return FALSE;
}
}
static void
gst_queue_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
@ -1296,7 +1595,7 @@ gst_queue_set_property (GObject * object,
queue->high_percent = g_value_get_int (value);
break;
case PROP_TEMP_LOCATION:
queue->temp_location = g_value_dup_string (value);
gst_queue_set_temp_location (queue, g_value_dup_string (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);