queue2: implement seeking in download mode

When in download mode and the requested offset is too far away, attempt to do a
seek request to fetch the data.
Keep track of all downloaded parts and merge ranges when needed.

Fixes #600877
This commit is contained in:
Wim Taymans 2010-03-23 19:25:29 +01:00
parent 4119945349
commit 3f4f5fa59d
2 changed files with 273 additions and 62 deletions

View file

@ -145,7 +145,7 @@ enum
queue->cur_level.time, \
queue->max_level.time, \
(guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \
queue->writing_pos - queue->max_reading_pos : \
queue->current->writing_pos - queue->current->max_reading_pos : \
queue->queue->length))
#define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
@ -452,6 +452,107 @@ gst_queue2_finalize (GObject * object)
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
debug_ranges (GstQueue2 * queue)
{
GstQueue2Range *walk;
for (walk = queue->ranges; walk; walk = walk->next) {
GST_DEBUG_OBJECT (queue, "range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT,
walk->offset, walk->writing_pos);
}
}
/* clear all the downloaded ranges */
static void
clean_ranges (GstQueue2 * queue)
{
GST_DEBUG_OBJECT (queue, "clean queue ranges");
g_slice_free_chain (GstQueue2Range, queue->ranges, next);
queue->ranges = NULL;
queue->current = NULL;
}
/* find a range that contains @offset or NULL when nothing does */
static GstQueue2Range *
find_range (GstQueue2 * queue, guint64 offset, guint64 length)
{
GstQueue2Range *range, *walk;
/* first do a quick check for the current range */
for (walk = queue->ranges; walk; walk = walk->next) {
if (offset >= walk->offset && offset <= walk->writing_pos) {
/* we can reuse an existing range */
range = walk;
break;
}
}
return range;
}
/* make a new range for @offset or reuse an existing range */
static GstQueue2Range *
add_range (GstQueue2 * queue, guint64 offset)
{
GstQueue2Range *range, *prev, *next;
GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
if ((range = find_range (queue, offset, 0))) {
GST_DEBUG_OBJECT (queue,
"reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
range->writing_pos);
range->writing_pos = offset;
} else {
GST_DEBUG_OBJECT (queue,
"new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
range = g_slice_new0 (GstQueue2Range);
range->offset = offset;
range->writing_pos = offset;
range->reading_pos = offset;
range->max_reading_pos = offset;
/* insert sorted */
prev = NULL;
next = queue->ranges;
while (next) {
if (next->offset > offset) {
/* insert before next */
GST_DEBUG_OBJECT (queue,
"insert before range %p, offset %" G_GUINT64_FORMAT, next,
next->offset);
break;
}
/* try next */
prev = next;
next = next->next;
}
range->next = next;
if (prev)
prev->next = range;
else
queue->ranges = range;
}
debug_ranges (queue);
return range;
}
/* clear and init the download ranges for offset 0 */
static void
init_ranges (GstQueue2 * queue)
{
GST_DEBUG_OBJECT (queue, "init queue ranges");
/* get rid of all the current ranges */
clean_ranges (queue);
/* make a range for offset 0 */
queue->current = add_range (queue, 0);
}
static gboolean
gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
{
@ -653,7 +754,7 @@ update_buffering (GstQueue2 * queue)
if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration))
buffering_left =
(gdouble) ((duration -
queue->writing_pos) * 1000) / queue->byte_in_rate;
queue->current->writing_pos) * 1000) / queue->byte_in_rate;
} else {
buffering_left = G_MAXINT64;
}
@ -785,13 +886,18 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
guint size;
guint8 *data;
int ret;
guint64 writing_pos, max_reading_pos;
GstQueue2Range *next;
writing_pos = queue->current->writing_pos;
max_reading_pos = queue->current->max_reading_pos;
#ifdef HAVE_FSEEKO
fseeko (queue->temp_file, (off_t) queue->writing_pos, SEEK_SET);
fseeko (queue->temp_file, (off_t) writing_pos, SEEK_SET);
#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
lseek (fileno (queue->temp_file), (off_t) queue->writing_pos, SEEK_SET);
lseek (fileno (queue->temp_file), (off_t) writing_pos, SEEK_SET);
#else
fseek (queue->temp_file, queue->writing_pos, SEEK_SET);
fseek (queue->temp_file, writing_pos, SEEK_SET);
#endif
data = GST_BUFFER_DATA (buffer);
@ -802,26 +908,87 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
/* FIXME do something useful here */
GST_ERROR_OBJECT (queue, "fwrite returned error");
}
queue->writing_pos += size;
writing_pos += size;
if (queue->writing_pos > queue->max_reading_pos)
queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
if (writing_pos > max_reading_pos)
queue->cur_level.bytes = writing_pos - max_reading_pos;
else
queue->cur_level.bytes = 0;
/* try to merge with next range */
while ((next = queue->current->next)) {
GST_DEBUG_OBJECT (queue,
"cheking %" G_GUINT64_FORMAT " < %" G_GUINT64_FORMAT, writing_pos,
next->offset);
if (writing_pos < next->offset)
break;
GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
next->writing_pos);
/* we ran over the offset of the next group */
queue->current->writing_pos = writing_pos = next->writing_pos;
/* remove the group */
queue->current->next = next->next;
g_slice_free (GstQueue2Range, next);
debug_ranges (queue);
}
queue->current->writing_pos = writing_pos;
}
static gboolean
perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
{
GstEvent *event;
gboolean res;
GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
event =
gst_event_new_seek (1.0, GST_FORMAT_BYTES,
GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
GST_SEEK_TYPE_NONE, -1);
GST_QUEUE2_MUTEX_UNLOCK (queue);
res = gst_pad_push_event (queue->sinkpad, event);
GST_QUEUE2_MUTEX_LOCK (queue);
if (res) {
queue->current = add_range (queue, offset);
}
return res;
}
/* see if there is enough data in the file to read a full buffer */
static gboolean
gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
{
GST_DEBUG_OBJECT (queue,
"offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset,
length, queue->writing_pos);
if (queue->is_eos)
return TRUE;
GstQueue2Range *range;
if (offset + length < queue->writing_pos)
return TRUE;
GST_DEBUG_OBJECT (queue, "offset %" G_GUINT64_FORMAT ", len %u", offset,
length);
if ((range = find_range (queue, offset, length))) {
if (queue->current != range) {
GST_DEBUG_OBJECT (queue, "switching ranges");
perform_seek_to_offset (queue, range->writing_pos);
}
/* we have a range for offset */
GST_DEBUG_OBJECT (queue,
"we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
if (queue->is_eos)
return TRUE;
if (offset + length < range->writing_pos)
return TRUE;
} else {
/* we don't have the range, see how far away we are */
perform_seek_to_offset (queue, offset);
}
return FALSE;
}
@ -832,6 +999,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
{
size_t res;
GstBuffer *buf;
guint64 reading_pos, max_reading_pos, writing_pos;
/* check if we have enough data at @offset. If there is not enough data, we
* block and wait. */
@ -874,14 +1042,21 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
*buffer = buf;
queue->reading_pos = offset + length;
queue->max_reading_pos = MAX (queue->max_reading_pos, queue->reading_pos);
reading_pos = queue->current->reading_pos;
writing_pos = queue->current->writing_pos;
max_reading_pos = queue->current->max_reading_pos;
if (queue->writing_pos > queue->max_reading_pos)
queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
reading_pos = offset + length;
max_reading_pos = MAX (max_reading_pos, reading_pos);
if (writing_pos > max_reading_pos)
queue->cur_level.bytes = writing_pos - max_reading_pos;
else
queue->cur_level.bytes = 0;
queue->current->reading_pos = reading_pos;
queue->current->max_reading_pos = max_reading_pos;
return GST_FLOW_OK;
/* ERRORS */
@ -921,9 +1096,12 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
} else {
GstFlowReturn ret;
GstBuffer *buffer;
guint64 reading_pos;
reading_pos = queue->current->reading_pos;
ret =
gst_queue2_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE,
gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
&buffer);
switch (ret) {
case GST_FLOW_OK:
@ -946,6 +1124,9 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue)
gint fd = -1;
gchar *name = NULL;
if (queue->temp_file)
goto already_opened;
GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
/* we have two cases:
@ -982,14 +1163,18 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue)
if (queue->temp_file == NULL)
goto open_failed;
}
GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
queue->writing_pos = 0;
queue->reading_pos = 0;
queue->max_reading_pos = 0;
init_ranges (queue);
return TRUE;
/* ERRORS */
already_opened:
{
GST_DEBUG_OBJECT (queue, "temp file was already open");
return TRUE;
}
no_directory:
{
GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
@ -1031,6 +1216,7 @@ gst_queue2_close_temp_location_file (GstQueue2 * queue)
remove (queue->temp_location);
queue->temp_file = NULL;
clean_ranges (queue);
}
static void
@ -1043,9 +1229,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
queue->writing_pos = 0;
queue->reading_pos = 0;
queue->max_reading_pos = 0;
init_ranges (queue);
}
static void
@ -1108,6 +1292,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
case GST_EVENT_EOS:
/* Zero the thresholds, this makes sure the queue is completely
* filled and we can read all data from the queue. */
GST_DEBUG_OBJECT (queue, "we have EOS");
queue->is_eos = TRUE;
break;
case GST_EVENT_NEWSEGMENT:
@ -1245,39 +1430,50 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
case GST_EVENT_FLUSH_START:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
/* forward event */
gst_pad_push_event (queue->srcpad, event);
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
/* now unblock the chain function */
GST_QUEUE2_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_WRONG_STATE;
/* unblock the loop and chain functions */
g_cond_signal (queue->item_add);
g_cond_signal (queue->item_del);
GST_QUEUE2_MUTEX_UNLOCK (queue);
/* now unblock the chain function */
GST_QUEUE2_MUTEX_LOCK (queue);
queue->srcresult = GST_FLOW_WRONG_STATE;
/* unblock the loop and chain functions */
g_cond_signal (queue->item_add);
g_cond_signal (queue->item_del);
GST_QUEUE2_MUTEX_UNLOCK (queue);
/* make sure it pauses, this should happen since we sent
* flush_start downstream. */
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
/* make sure it pauses, this should happen since we sent
* flush_start downstream. */
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
}
goto done;
}
case GST_EVENT_FLUSH_STOP:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
/* forward event */
gst_pad_push_event (queue->srcpad, event);
GST_QUEUE2_MUTEX_LOCK (queue);
gst_queue2_locked_flush (queue);
queue->srcresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
/* reset rate counters */
reset_rate_timer (queue);
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
queue->srcpad);
GST_QUEUE2_MUTEX_UNLOCK (queue);
if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
GST_QUEUE2_MUTEX_LOCK (queue);
gst_queue2_locked_flush (queue);
queue->srcresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
/* reset rate counters */
reset_rate_timer (queue);
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
queue->srcpad);
GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE2_MUTEX_LOCK (queue);
queue->segment_event_received = FALSE;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
GST_QUEUE2_MUTEX_UNLOCK (queue);
}
goto done;
}
default:
@ -1323,7 +1519,7 @@ gst_queue2_is_empty (GstQueue2 * queue)
return FALSE;
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
return queue->writing_pos == queue->max_reading_pos;
return queue->current->writing_pos == queue->current->max_reading_pos;
} else {
if (queue->queue->length == 0)
return TRUE;
@ -1626,7 +1822,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
/* just forward upstream */
res = gst_pad_push_event (queue->sinkpad, event);
} else {
/* when using a temp file, we unblock the pending read */
/* when using a temp file, we eat the event */
res = TRUE;
gst_event_unref (event);
}
@ -1706,6 +1902,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
} else {
gint64 start, stop;
guint64 writing_pos;
gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
@ -1721,20 +1918,22 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
&duration))
goto peer_failed;
writing_pos = queue->current->writing_pos;
GST_DEBUG_OBJECT (queue, "duration %" G_GINT64_FORMAT ", writing %"
G_GINT64_FORMAT, duration, queue->writing_pos);
G_GINT64_FORMAT, duration, writing_pos);
start = 0;
/* get our available data relative to the duration */
if (duration != -1)
stop = GST_FORMAT_PERCENT_MAX * queue->writing_pos / duration;
stop = GST_FORMAT_PERCENT_MAX * writing_pos / duration;
else
stop = -1;
break;
}
case GST_FORMAT_BYTES:
start = 0;
stop = queue->writing_pos;
stop = queue->current->writing_pos;
break;
default:
start = -1;
@ -1774,7 +1973,7 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
offset = (offset == -1) ? queue->reading_pos : offset;
offset = (offset == -1) ? queue->current->reading_pos : offset;
/* function will block when the range is not yet available */
ret = gst_queue2_create_read (queue, offset, length, buffer);
@ -1887,12 +2086,14 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
if (active) {
if (QUEUE_IS_USING_TEMP_FILE (queue)) {
/* open the temp file now */
result = gst_queue2_open_temp_location_file (queue);
GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating pull mode");
queue->srcresult = GST_FLOW_OK;
queue->is_eos = FALSE;
queue->unexpected = FALSE;
result = TRUE;
GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE2_MUTEX_LOCK (queue);

View file

@ -45,6 +45,7 @@ G_BEGIN_DECLS
typedef struct _GstQueue2 GstQueue2;
typedef struct _GstQueue2Size GstQueue2Size;
typedef struct _GstQueue2Class GstQueue2Class;
typedef struct _GstQueue2Range GstQueue2Range;
/* used to keep track of sizes (current and max) */
struct _GstQueue2Size
@ -55,6 +56,16 @@ struct _GstQueue2Size
guint64 rate_time;
};
struct _GstQueue2Range
{
GstQueue2Range *next;
guint64 offset;
guint64 writing_pos;
guint64 reading_pos;
guint64 max_reading_pos;
};
struct _GstQueue2
{
GstElement element;
@ -112,14 +123,13 @@ struct _GstQueue2
gchar *temp_location;
gboolean temp_remove;
FILE *temp_file;
guint64 writing_pos;
guint64 reading_pos;
guint64 max_reading_pos;
/* list of downloaded areas and the current area */
GstQueue2Range *ranges;
GstQueue2Range *current;
/* we need this to send the first new segment event of the stream
* because we can't save it on the file */
gboolean segment_event_received;
GstEvent *starting_segment;
};
struct _GstQueue2Class