mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-22 00:06:36 +00:00
gst/playback/gstqueue2.c: Add support for filebased buffering. Fixes #441264.
Original commit message from CVS: Based on patch by: Thiago Sousa Santos <thiagossantos at gmail dot com> * gst/playback/gstqueue2.c: (gst_queue_class_init), (gst_queue_init), (gst_queue_finalize), (gst_queue_write_buffer_to_file), (gst_queue_have_data), (gst_queue_create_read), (gst_queue_read_item_from_file), (gst_queue_open_temp_location_file), (gst_queue_close_temp_location_file), (gst_queue_locked_flush), (gst_queue_locked_enqueue), (gst_queue_locked_dequeue), (gst_queue_is_empty), (gst_queue_is_filled), (gst_queue_change_state), (gst_queue_set_temp_location), (gst_queue_set_property): Add support for filebased buffering. Fixes #441264.
This commit is contained in:
parent
b2486d65c6
commit
14d39928f5
1 changed files with 314 additions and 15 deletions
|
@ -1,9 +1,8 @@
|
|||
/* GStreamer
|
||||
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
|
||||
* 2000 Wim Taymans <wtay@chello.be>
|
||||
* 2003 Colin Walters <cwalters@gnome.org>
|
||||
* 2005 Wim Taymans <wim@fluendo.com>
|
||||
* 2007 Wim Taymans <wim@fluendo.com>
|
||||
* 2000,2005,2007 Wim Taymans <wim@fluendo.com>
|
||||
* 2007 Thiago Sousa Santos <thiagossantos at gmail dot com>
|
||||
*
|
||||
* gstqueue2.c:
|
||||
*
|
||||
|
@ -41,12 +40,20 @@
|
|||
*
|
||||
* The default queue size limits are 100 buffers, 2MB of data, or
|
||||
* two seconds worth of data, whichever is reached first.
|
||||
*
|
||||
* If you set temp-location, the element will buffer data on the file
|
||||
* specified by it. By using this, it will buffer the entire
|
||||
* stream data on the file independently of the queue size limits, they
|
||||
* will only be used for buffering statistics.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifdef HAVE_CONFIG_H
|
||||
#include "config.h"
|
||||
#endif
|
||||
|
||||
#include <glib/gstdio.h>
|
||||
|
||||
#include <gst/gst.h>
|
||||
#include <gst/gst-i18n-plugin.h>
|
||||
|
||||
|
@ -83,6 +90,10 @@ enum
|
|||
#define DEFAULT_LOW_PERCENT 10
|
||||
#define DEFAULT_HIGH_PERCENT 99
|
||||
|
||||
/* other defines */
|
||||
#define DEFAULT_BUFFER_SIZE 4096
|
||||
#define QUEUE_IS_USING_TEMP_FILE(queue) (queue->temp_location != NULL)
|
||||
|
||||
enum
|
||||
{
|
||||
PROP_0,
|
||||
|
@ -176,6 +187,13 @@ struct _GstQueue
|
|||
|
||||
/* temp location stuff */
|
||||
gchar *temp_location;
|
||||
FILE *temp_file;
|
||||
guint64 writing_pos;
|
||||
guint64 reading_pos;
|
||||
/* we need this to send the first new segment event of the stream
|
||||
* because we can't save it on the file */
|
||||
gboolean segment_event_received;
|
||||
GstEvent *starting_segment;
|
||||
};
|
||||
|
||||
struct _GstQueueClass
|
||||
|
@ -195,7 +213,9 @@ struct _GstQueueClass
|
|||
queue->max_level.bytes, \
|
||||
queue->cur_level.time, \
|
||||
queue->max_level.time, \
|
||||
queue->queue->length)
|
||||
QUEUE_IS_USING_TEMP_FILE(queue) ? \
|
||||
queue->writing_pos - queue->reading_pos : \
|
||||
queue->queue->length)
|
||||
|
||||
#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
|
||||
g_mutex_lock (q->qlock); \
|
||||
|
@ -371,7 +391,7 @@ gst_queue_class_init (GstQueueClass * klass)
|
|||
|
||||
g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
|
||||
g_param_spec_string ("temp-location", "Temporary File Location",
|
||||
"Location of a temporary file to store data in (unused)",
|
||||
"Location of a temporary file to store data in",
|
||||
NULL, G_PARAM_READWRITE));
|
||||
|
||||
gst_element_class_add_pad_template (gstelement_class,
|
||||
|
@ -443,6 +463,10 @@ gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
|
|||
queue->item_del = g_cond_new ();
|
||||
queue->queue = g_queue_new ();
|
||||
|
||||
/* tempfile related */
|
||||
queue->temp_location = NULL;
|
||||
queue->temp_file = NULL;
|
||||
|
||||
GST_DEBUG_OBJECT (queue,
|
||||
"initialized queue's not_empty & not_full conditions");
|
||||
}
|
||||
|
@ -460,12 +484,17 @@ gst_queue_finalize (GObject * object)
|
|||
|
||||
gst_mini_object_unref (data);
|
||||
}
|
||||
|
||||
g_queue_free (queue->queue);
|
||||
g_mutex_free (queue->qlock);
|
||||
g_cond_free (queue->item_add);
|
||||
g_cond_free (queue->item_del);
|
||||
g_timer_destroy (queue->timer);
|
||||
|
||||
/* temp_file path cleanup */
|
||||
if (queue->temp_location != NULL)
|
||||
g_free (queue->temp_location);
|
||||
|
||||
G_OBJECT_CLASS (parent_class)->finalize (object);
|
||||
}
|
||||
|
||||
|
@ -719,20 +748,209 @@ update_rates (GstQueue * queue)
|
|||
GST_TIME_ARGS (queue->cur_level.rate_time));
|
||||
}
|
||||
|
||||
static void
|
||||
gst_queue_write_buffer_to_file (GstQueue * queue, GstBuffer * buffer)
|
||||
{
|
||||
guint size;
|
||||
guint8 *data;
|
||||
|
||||
fseek (queue->temp_file, queue->writing_pos, SEEK_SET);
|
||||
|
||||
data = GST_BUFFER_DATA (buffer);
|
||||
size = GST_BUFFER_SIZE (buffer);
|
||||
|
||||
fwrite (data, 1, size, queue->temp_file);
|
||||
queue->writing_pos += size;
|
||||
}
|
||||
|
||||
/* see if there is enough data in the file to read a full buffer */
|
||||
static gboolean
|
||||
gst_queue_have_data (GstQueue * queue, guint64 offset, guint length)
|
||||
{
|
||||
GST_DEBUG_OBJECT (queue,
|
||||
"offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset,
|
||||
length, queue->writing_pos);
|
||||
if (queue->is_eos)
|
||||
return TRUE;
|
||||
|
||||
if (offset + length < queue->writing_pos)
|
||||
return TRUE;
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
static GstFlowReturn
|
||||
gst_queue_create_read (GstQueue * queue, guint64 offset, guint length,
|
||||
GstBuffer ** buffer)
|
||||
{
|
||||
size_t res;
|
||||
GstBuffer *buf;
|
||||
off_t sres;
|
||||
|
||||
/* check if we have enough data at @offset. If there is not enough data, we
|
||||
* block and wait. */
|
||||
while (!gst_queue_have_data (queue, offset, length)) {
|
||||
GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
|
||||
}
|
||||
|
||||
sres = fseeko (queue->temp_file, offset, SEEK_SET);
|
||||
if (G_UNLIKELY (sres < 0))
|
||||
goto seek_failed;
|
||||
|
||||
buf = gst_buffer_new_and_alloc (length);
|
||||
|
||||
/* this should not block */
|
||||
GST_LOG_OBJECT (queue, "Reading %d bytes", length);
|
||||
res = fread (GST_BUFFER_DATA (buf), 1, length, queue->temp_file);
|
||||
GST_LOG_OBJECT (queue, "read %d bytes", res);
|
||||
|
||||
if (G_UNLIKELY (res == 0)) {
|
||||
/* check for errors or EOF */
|
||||
if (ferror (queue->temp_file))
|
||||
goto could_not_read;
|
||||
if (feof (queue->temp_file) && length > 0)
|
||||
goto eos;
|
||||
}
|
||||
|
||||
length = res;
|
||||
|
||||
GST_BUFFER_SIZE (buf) = length;
|
||||
GST_BUFFER_OFFSET (buf) = offset;
|
||||
GST_BUFFER_OFFSET_END (buf) = offset + length;
|
||||
|
||||
*buffer = buf;
|
||||
|
||||
queue->reading_pos = offset + length;
|
||||
|
||||
return GST_FLOW_OK;
|
||||
|
||||
/* ERRORS */
|
||||
out_flushing:
|
||||
{
|
||||
GST_DEBUG_OBJECT (queue, "we are flushing");
|
||||
return GST_FLOW_WRONG_STATE;
|
||||
}
|
||||
seek_failed:
|
||||
{
|
||||
GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
could_not_read:
|
||||
{
|
||||
GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
|
||||
gst_buffer_unref (buf);
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
eos:
|
||||
{
|
||||
GST_DEBUG ("non-regular file hits EOS");
|
||||
gst_buffer_unref (buf);
|
||||
return GST_FLOW_UNEXPECTED;
|
||||
}
|
||||
}
|
||||
|
||||
static GstMiniObject *
|
||||
gst_queue_read_item_from_file (GstQueue * queue)
|
||||
{
|
||||
GstMiniObject *item;
|
||||
|
||||
if (queue->starting_segment != NULL) {
|
||||
item = GST_MINI_OBJECT_CAST (queue->starting_segment);
|
||||
queue->starting_segment = NULL;
|
||||
} else {
|
||||
GstFlowReturn ret;
|
||||
GstBuffer *buffer;
|
||||
|
||||
ret =
|
||||
gst_queue_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE,
|
||||
&buffer);
|
||||
switch (ret) {
|
||||
case GST_FLOW_OK:
|
||||
item = GST_MINI_OBJECT_CAST (buffer);
|
||||
break;
|
||||
case GST_FLOW_UNEXPECTED:
|
||||
item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
|
||||
break;
|
||||
default:
|
||||
item = NULL;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return item;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_queue_open_temp_location_file (GstQueue * queue)
|
||||
{
|
||||
/* nothing to do */
|
||||
if (queue->temp_location == NULL)
|
||||
goto no_filename;
|
||||
|
||||
/* open the file for update/writing */
|
||||
queue->temp_file = g_fopen (queue->temp_location, "wb+");
|
||||
/* error creating file */
|
||||
if (queue->temp_file == NULL)
|
||||
goto open_failed;
|
||||
|
||||
queue->writing_pos = 0;
|
||||
queue->reading_pos = 0;
|
||||
|
||||
return TRUE;
|
||||
|
||||
/* ERRORS */
|
||||
no_filename:
|
||||
{
|
||||
GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
|
||||
(_("No file name specified.")), (NULL));
|
||||
return FALSE;
|
||||
}
|
||||
open_failed:
|
||||
{
|
||||
GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
|
||||
(_("Could not open file \"%s\" for reading."), queue->temp_location),
|
||||
GST_ERROR_SYSTEM);
|
||||
return FALSE;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
gst_queue_close_temp_location_file (GstQueue * queue)
|
||||
{
|
||||
/* nothing to do */
|
||||
if (queue->temp_file == NULL)
|
||||
return;
|
||||
|
||||
/* we don't remove the file so that the application can use it as a cache
|
||||
* later on */
|
||||
fflush (queue->temp_file);
|
||||
fclose (queue->temp_file);
|
||||
remove (queue->temp_location);
|
||||
queue->temp_file = NULL;
|
||||
}
|
||||
|
||||
static void
|
||||
gst_queue_locked_flush (GstQueue * queue)
|
||||
{
|
||||
while (!g_queue_is_empty (queue->queue)) {
|
||||
GstMiniObject *data = g_queue_pop_head (queue->queue);
|
||||
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
|
||||
gst_queue_close_temp_location_file (queue);
|
||||
gst_queue_open_temp_location_file (queue);
|
||||
} else {
|
||||
while (!g_queue_is_empty (queue->queue)) {
|
||||
GstMiniObject *data = g_queue_pop_head (queue->queue);
|
||||
|
||||
/* Then lose another reference because we are supposed to destroy that
|
||||
data when flushing */
|
||||
gst_mini_object_unref (data);
|
||||
/* Then lose another reference because we are supposed to destroy that
|
||||
data when flushing */
|
||||
gst_mini_object_unref (data);
|
||||
}
|
||||
}
|
||||
GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
|
||||
gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
|
||||
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
|
||||
queue->is_eos = FALSE;
|
||||
if (queue->starting_segment != NULL)
|
||||
gst_event_unref (queue->starting_segment);
|
||||
queue->starting_segment = NULL;
|
||||
queue->segment_event_received = FALSE;
|
||||
|
||||
/* we deleted a lot of something */
|
||||
GST_QUEUE_SIGNAL_DEL (queue);
|
||||
|
@ -760,6 +978,10 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
|
|||
/* update the buffering status */
|
||||
update_buffering (queue);
|
||||
|
||||
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
|
||||
gst_queue_write_buffer_to_file (queue, buffer);
|
||||
}
|
||||
|
||||
} else if (GST_IS_EVENT (item)) {
|
||||
GstEvent *event;
|
||||
|
||||
|
@ -773,8 +995,19 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
|
|||
break;
|
||||
case GST_EVENT_NEWSEGMENT:
|
||||
apply_segment (queue, event, &queue->sink_segment);
|
||||
/* This is our first new segment, we hold it
|
||||
* as we can't save it on the temp file */
|
||||
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
|
||||
if (queue->segment_event_received)
|
||||
goto unexpected_event;
|
||||
|
||||
queue->segment_event_received = TRUE;
|
||||
queue->starting_segment = event;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
if (QUEUE_IS_USING_TEMP_FILE (queue))
|
||||
goto unexpected_event;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
|
@ -784,9 +1017,22 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
|
|||
item = NULL;
|
||||
}
|
||||
|
||||
if (item)
|
||||
if (!QUEUE_IS_USING_TEMP_FILE (queue) && item)
|
||||
g_queue_push_tail (queue->queue, item);
|
||||
GST_QUEUE_SIGNAL_ADD (queue);
|
||||
|
||||
return;
|
||||
|
||||
/* ERRORS */
|
||||
unexpected_event:
|
||||
{
|
||||
g_warning
|
||||
("Unexpected event of kind %s can't be added in temp file of queue %s ",
|
||||
gst_event_type_get_name (GST_EVENT_TYPE (item)),
|
||||
GST_OBJECT_NAME (queue));
|
||||
gst_event_unref (GST_EVENT_CAST (item));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* dequeue an item from the queue and update level stats */
|
||||
|
@ -795,7 +1041,11 @@ gst_queue_locked_dequeue (GstQueue * queue)
|
|||
{
|
||||
GstMiniObject *item;
|
||||
|
||||
item = g_queue_pop_head (queue->queue);
|
||||
if (QUEUE_IS_USING_TEMP_FILE (queue))
|
||||
item = gst_queue_read_item_from_file (queue);
|
||||
else
|
||||
item = g_queue_pop_head (queue->queue);
|
||||
|
||||
if (item == NULL)
|
||||
goto no_item;
|
||||
|
||||
|
@ -927,8 +1177,12 @@ gst_queue_is_empty (GstQueue * queue)
|
|||
if (queue->is_eos)
|
||||
return FALSE;
|
||||
|
||||
if (queue->queue->length == 0)
|
||||
return TRUE;
|
||||
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
|
||||
return queue->writing_pos == queue->reading_pos;
|
||||
} else {
|
||||
if (queue->queue->length == 0)
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
@ -942,6 +1196,10 @@ gst_queue_is_filled (GstQueue * queue)
|
|||
if (queue->is_eos)
|
||||
return TRUE;
|
||||
|
||||
/* if using file, we're never filled if we don't have EOS */
|
||||
if (QUEUE_IS_USING_TEMP_FILE (queue))
|
||||
return FALSE;
|
||||
|
||||
#define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \
|
||||
(queue->cur_level.format) >= (queue->max_level.format))
|
||||
|
||||
|
@ -1221,6 +1479,12 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
|
|||
case GST_STATE_CHANGE_NULL_TO_READY:
|
||||
break;
|
||||
case GST_STATE_CHANGE_READY_TO_PAUSED:
|
||||
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
|
||||
if (!gst_queue_open_temp_location_file (queue))
|
||||
ret = GST_STATE_CHANGE_FAILURE;
|
||||
}
|
||||
queue->segment_event_received = FALSE;
|
||||
queue->starting_segment = NULL;
|
||||
break;
|
||||
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
|
||||
break;
|
||||
|
@ -1234,6 +1498,12 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
|
|||
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
|
||||
break;
|
||||
case GST_STATE_CHANGE_PAUSED_TO_READY:
|
||||
if (QUEUE_IS_USING_TEMP_FILE (queue))
|
||||
gst_queue_close_temp_location_file (queue);
|
||||
if (queue->starting_segment != NULL) {
|
||||
gst_event_unref (queue->starting_segment);
|
||||
queue->starting_segment = NULL;
|
||||
}
|
||||
break;
|
||||
case GST_STATE_CHANGE_READY_TO_NULL:
|
||||
break;
|
||||
|
@ -1257,6 +1527,35 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
|
|||
#define QUEUE_THRESHOLD_CHANGE(q)\
|
||||
g_cond_signal (queue->item_add);
|
||||
|
||||
static gboolean
|
||||
gst_queue_set_temp_location (GstQueue * queue, const gchar * location)
|
||||
{
|
||||
GstState state;
|
||||
|
||||
/* the element must be stopped in order to do this */
|
||||
GST_OBJECT_LOCK (queue);
|
||||
state = GST_STATE (queue);
|
||||
if (state != GST_STATE_READY && state != GST_STATE_NULL)
|
||||
goto wrong_state;
|
||||
GST_OBJECT_UNLOCK (queue);
|
||||
|
||||
/* set new location */
|
||||
g_free (queue->temp_location);
|
||||
queue->temp_location = g_strdup (location);
|
||||
|
||||
g_object_notify (G_OBJECT (queue), "temp-location");
|
||||
|
||||
return TRUE;
|
||||
|
||||
/* ERROR */
|
||||
wrong_state:
|
||||
{
|
||||
GST_DEBUG_OBJECT (queue, "setting temp-location in wrong state");
|
||||
GST_OBJECT_UNLOCK (queue);
|
||||
return FALSE;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
gst_queue_set_property (GObject * object,
|
||||
guint prop_id, const GValue * value, GParamSpec * pspec)
|
||||
|
@ -1296,7 +1595,7 @@ gst_queue_set_property (GObject * object,
|
|||
queue->high_percent = g_value_get_int (value);
|
||||
break;
|
||||
case PROP_TEMP_LOCATION:
|
||||
queue->temp_location = g_value_dup_string (value);
|
||||
gst_queue_set_temp_location (queue, g_value_dup_string (value));
|
||||
break;
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
|
|
Loading…
Reference in a new issue