diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index f5f715a8c3..532a317b10 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -145,7 +145,7 @@ enum queue->cur_level.time, \ queue->max_level.time, \ (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \ - queue->writing_pos - queue->max_reading_pos : \ + queue->current->writing_pos - queue->current->max_reading_pos : \ queue->queue->length)) #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \ @@ -452,6 +452,107 @@ gst_queue2_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } +static void +debug_ranges (GstQueue2 * queue) +{ + GstQueue2Range *walk; + + for (walk = queue->ranges; walk; walk = walk->next) { + GST_DEBUG_OBJECT (queue, "range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, + walk->offset, walk->writing_pos); + } +} + +/* clear all the downloaded ranges */ +static void +clean_ranges (GstQueue2 * queue) +{ + GST_DEBUG_OBJECT (queue, "clean queue ranges"); + + g_slice_free_chain (GstQueue2Range, queue->ranges, next); + queue->ranges = NULL; + queue->current = NULL; +} + +/* find a range that contains @offset or NULL when nothing does */ +static GstQueue2Range * +find_range (GstQueue2 * queue, guint64 offset, guint64 length) +{ + GstQueue2Range *range, *walk; + + /* first do a quick check for the current range */ + for (walk = queue->ranges; walk; walk = walk->next) { + if (offset >= walk->offset && offset <= walk->writing_pos) { + /* we can reuse an existing range */ + range = walk; + break; + } + } + return range; +} + +/* make a new range for @offset or reuse an existing range */ +static GstQueue2Range * +add_range (GstQueue2 * queue, guint64 offset) +{ + GstQueue2Range *range, *prev, *next; + + GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset); + + if ((range = find_range (queue, offset, 0))) { + GST_DEBUG_OBJECT (queue, + "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset, + range->writing_pos); + range->writing_pos = offset; + } else { + GST_DEBUG_OBJECT (queue, + "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset); + + range = g_slice_new0 (GstQueue2Range); + range->offset = offset; + range->writing_pos = offset; + range->reading_pos = offset; + range->max_reading_pos = offset; + + /* insert sorted */ + prev = NULL; + next = queue->ranges; + while (next) { + if (next->offset > offset) { + /* insert before next */ + GST_DEBUG_OBJECT (queue, + "insert before range %p, offset %" G_GUINT64_FORMAT, next, + next->offset); + break; + } + /* try next */ + prev = next; + next = next->next; + } + range->next = next; + if (prev) + prev->next = range; + else + queue->ranges = range; + } + debug_ranges (queue); + + return range; +} + + +/* clear and init the download ranges for offset 0 */ +static void +init_ranges (GstQueue2 * queue) +{ + GST_DEBUG_OBJECT (queue, "init queue ranges"); + + /* get rid of all the current ranges */ + clean_ranges (queue); + /* make a range for offset 0 */ + queue->current = add_range (queue, 0); +} + static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps) { @@ -653,7 +754,7 @@ update_buffering (GstQueue2 * queue) if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration)) buffering_left = (gdouble) ((duration - - queue->writing_pos) * 1000) / queue->byte_in_rate; + queue->current->writing_pos) * 1000) / queue->byte_in_rate; } else { buffering_left = G_MAXINT64; } @@ -785,13 +886,18 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) guint size; guint8 *data; int ret; + guint64 writing_pos, max_reading_pos; + GstQueue2Range *next; + + writing_pos = queue->current->writing_pos; + max_reading_pos = queue->current->max_reading_pos; #ifdef HAVE_FSEEKO - fseeko (queue->temp_file, (off_t) queue->writing_pos, SEEK_SET); + fseeko (queue->temp_file, (off_t) writing_pos, SEEK_SET); #elif defined (G_OS_UNIX) || defined (G_OS_WIN32) - lseek (fileno (queue->temp_file), (off_t) queue->writing_pos, SEEK_SET); + lseek (fileno (queue->temp_file), (off_t) writing_pos, SEEK_SET); #else - fseek (queue->temp_file, queue->writing_pos, SEEK_SET); + fseek (queue->temp_file, writing_pos, SEEK_SET); #endif data = GST_BUFFER_DATA (buffer); @@ -802,26 +908,87 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) /* FIXME do something useful here */ GST_ERROR_OBJECT (queue, "fwrite returned error"); } - queue->writing_pos += size; + writing_pos += size; - if (queue->writing_pos > queue->max_reading_pos) - queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos; + if (writing_pos > max_reading_pos) + queue->cur_level.bytes = writing_pos - max_reading_pos; else queue->cur_level.bytes = 0; + + /* try to merge with next range */ + while ((next = queue->current->next)) { + GST_DEBUG_OBJECT (queue, + "cheking %" G_GUINT64_FORMAT " < %" G_GUINT64_FORMAT, writing_pos, + next->offset); + if (writing_pos < next->offset) + break; + + GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT, + next->writing_pos); + /* we ran over the offset of the next group */ + queue->current->writing_pos = writing_pos = next->writing_pos; + + /* remove the group */ + queue->current->next = next->next; + g_slice_free (GstQueue2Range, next); + + debug_ranges (queue); + } + queue->current->writing_pos = writing_pos; +} + +static gboolean +perform_seek_to_offset (GstQueue2 * queue, guint64 offset) +{ + GstEvent *event; + gboolean res; + + GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset); + + event = + gst_event_new_seek (1.0, GST_FORMAT_BYTES, + GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset, + GST_SEEK_TYPE_NONE, -1); + + GST_QUEUE2_MUTEX_UNLOCK (queue); + res = gst_pad_push_event (queue->sinkpad, event); + GST_QUEUE2_MUTEX_LOCK (queue); + + if (res) { + queue->current = add_range (queue, offset); + } + return res; } /* see if there is enough data in the file to read a full buffer */ static gboolean gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) { - GST_DEBUG_OBJECT (queue, - "offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset, - length, queue->writing_pos); - if (queue->is_eos) - return TRUE; + GstQueue2Range *range; - if (offset + length < queue->writing_pos) - return TRUE; + GST_DEBUG_OBJECT (queue, "offset %" G_GUINT64_FORMAT ", len %u", offset, + length); + + if ((range = find_range (queue, offset, length))) { + if (queue->current != range) { + GST_DEBUG_OBJECT (queue, "switching ranges"); + perform_seek_to_offset (queue, range->writing_pos); + } + /* we have a range for offset */ + GST_DEBUG_OBJECT (queue, + "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %" + G_GUINT64_FORMAT, range, range->offset, range->writing_pos); + + if (queue->is_eos) + return TRUE; + + if (offset + length < range->writing_pos) + return TRUE; + + } else { + /* we don't have the range, see how far away we are */ + perform_seek_to_offset (queue, offset); + } return FALSE; } @@ -832,6 +999,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, { size_t res; GstBuffer *buf; + guint64 reading_pos, max_reading_pos, writing_pos; /* check if we have enough data at @offset. If there is not enough data, we * block and wait. */ @@ -874,14 +1042,21 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, *buffer = buf; - queue->reading_pos = offset + length; - queue->max_reading_pos = MAX (queue->max_reading_pos, queue->reading_pos); + reading_pos = queue->current->reading_pos; + writing_pos = queue->current->writing_pos; + max_reading_pos = queue->current->max_reading_pos; - if (queue->writing_pos > queue->max_reading_pos) - queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos; + reading_pos = offset + length; + max_reading_pos = MAX (max_reading_pos, reading_pos); + + if (writing_pos > max_reading_pos) + queue->cur_level.bytes = writing_pos - max_reading_pos; else queue->cur_level.bytes = 0; + queue->current->reading_pos = reading_pos; + queue->current->max_reading_pos = max_reading_pos; + return GST_FLOW_OK; /* ERRORS */ @@ -921,9 +1096,12 @@ gst_queue2_read_item_from_file (GstQueue2 * queue) } else { GstFlowReturn ret; GstBuffer *buffer; + guint64 reading_pos; + + reading_pos = queue->current->reading_pos; ret = - gst_queue2_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE, + gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE, &buffer); switch (ret) { case GST_FLOW_OK: @@ -946,6 +1124,9 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue) gint fd = -1; gchar *name = NULL; + if (queue->temp_file) + goto already_opened; + GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template); /* we have two cases: @@ -982,14 +1163,18 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue) if (queue->temp_file == NULL) goto open_failed; } + GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template); - queue->writing_pos = 0; - queue->reading_pos = 0; - queue->max_reading_pos = 0; + init_ranges (queue); return TRUE; /* ERRORS */ +already_opened: + { + GST_DEBUG_OBJECT (queue, "temp file was already open"); + return TRUE; + } no_directory: { GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND, @@ -1031,6 +1216,7 @@ gst_queue2_close_temp_location_file (GstQueue2 * queue) remove (queue->temp_location); queue->temp_file = NULL; + clean_ranges (queue); } static void @@ -1043,9 +1229,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue) queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file); - queue->writing_pos = 0; - queue->reading_pos = 0; - queue->max_reading_pos = 0; + init_ranges (queue); } static void @@ -1108,6 +1292,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item) case GST_EVENT_EOS: /* Zero the thresholds, this makes sure the queue is completely * filled and we can read all data from the queue. */ + GST_DEBUG_OBJECT (queue, "we have EOS"); queue->is_eos = TRUE; break; case GST_EVENT_NEWSEGMENT: @@ -1245,39 +1430,50 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event) case GST_EVENT_FLUSH_START: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event"); - /* forward event */ - gst_pad_push_event (queue->srcpad, event); + if (!QUEUE_IS_USING_TEMP_FILE (queue)) { + /* forward event */ + gst_pad_push_event (queue->srcpad, event); - /* now unblock the chain function */ - GST_QUEUE2_MUTEX_LOCK (queue); - queue->srcresult = GST_FLOW_WRONG_STATE; - /* unblock the loop and chain functions */ - g_cond_signal (queue->item_add); - g_cond_signal (queue->item_del); - GST_QUEUE2_MUTEX_UNLOCK (queue); + /* now unblock the chain function */ + GST_QUEUE2_MUTEX_LOCK (queue); + queue->srcresult = GST_FLOW_WRONG_STATE; + /* unblock the loop and chain functions */ + g_cond_signal (queue->item_add); + g_cond_signal (queue->item_del); + GST_QUEUE2_MUTEX_UNLOCK (queue); - /* make sure it pauses, this should happen since we sent - * flush_start downstream. */ - gst_pad_pause_task (queue->srcpad); - GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); + /* make sure it pauses, this should happen since we sent + * flush_start downstream. */ + gst_pad_pause_task (queue->srcpad); + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); + } goto done; } case GST_EVENT_FLUSH_STOP: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event"); - /* forward event */ - gst_pad_push_event (queue->srcpad, event); - GST_QUEUE2_MUTEX_LOCK (queue); - gst_queue2_locked_flush (queue); - queue->srcresult = GST_FLOW_OK; - queue->is_eos = FALSE; - queue->unexpected = FALSE; - /* reset rate counters */ - reset_rate_timer (queue); - gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop, - queue->srcpad); - GST_QUEUE2_MUTEX_UNLOCK (queue); + if (!QUEUE_IS_USING_TEMP_FILE (queue)) { + /* forward event */ + gst_pad_push_event (queue->srcpad, event); + + GST_QUEUE2_MUTEX_LOCK (queue); + gst_queue2_locked_flush (queue); + queue->srcresult = GST_FLOW_OK; + queue->is_eos = FALSE; + queue->unexpected = FALSE; + /* reset rate counters */ + reset_rate_timer (queue); + gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop, + queue->srcpad); + GST_QUEUE2_MUTEX_UNLOCK (queue); + } else { + GST_QUEUE2_MUTEX_LOCK (queue); + queue->segment_event_received = FALSE; + queue->is_eos = FALSE; + queue->unexpected = FALSE; + GST_QUEUE2_MUTEX_UNLOCK (queue); + } goto done; } default: @@ -1323,7 +1519,7 @@ gst_queue2_is_empty (GstQueue2 * queue) return FALSE; if (QUEUE_IS_USING_TEMP_FILE (queue)) { - return queue->writing_pos == queue->max_reading_pos; + return queue->current->writing_pos == queue->current->max_reading_pos; } else { if (queue->queue->length == 0) return TRUE; @@ -1626,7 +1822,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event) /* just forward upstream */ res = gst_pad_push_event (queue->sinkpad, event); } else { - /* when using a temp file, we unblock the pending read */ + /* when using a temp file, we eat the event */ res = TRUE; gst_event_unref (event); } @@ -1706,6 +1902,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query) GST_DEBUG_OBJECT (queue, "buffering forwarded to peer"); } else { gint64 start, stop; + guint64 writing_pos; gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL); @@ -1721,20 +1918,22 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query) &duration)) goto peer_failed; + writing_pos = queue->current->writing_pos; + GST_DEBUG_OBJECT (queue, "duration %" G_GINT64_FORMAT ", writing %" - G_GINT64_FORMAT, duration, queue->writing_pos); + G_GINT64_FORMAT, duration, writing_pos); start = 0; /* get our available data relative to the duration */ if (duration != -1) - stop = GST_FORMAT_PERCENT_MAX * queue->writing_pos / duration; + stop = GST_FORMAT_PERCENT_MAX * writing_pos / duration; else stop = -1; break; } case GST_FORMAT_BYTES: start = 0; - stop = queue->writing_pos; + stop = queue->current->writing_pos; break; default: start = -1; @@ -1774,7 +1973,7 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length, GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing); length = (length == -1) ? DEFAULT_BUFFER_SIZE : length; - offset = (offset == -1) ? queue->reading_pos : offset; + offset = (offset == -1) ? queue->current->reading_pos : offset; /* function will block when the range is not yet available */ ret = gst_queue2_create_read (queue, offset, length, buffer); @@ -1887,12 +2086,14 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active) if (active) { if (QUEUE_IS_USING_TEMP_FILE (queue)) { + /* open the temp file now */ + result = gst_queue2_open_temp_location_file (queue); + GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "activating pull mode"); queue->srcresult = GST_FLOW_OK; queue->is_eos = FALSE; queue->unexpected = FALSE; - result = TRUE; GST_QUEUE2_MUTEX_UNLOCK (queue); } else { GST_QUEUE2_MUTEX_LOCK (queue); diff --git a/plugins/elements/gstqueue2.h b/plugins/elements/gstqueue2.h index 55ad92b331..a47f88340c 100644 --- a/plugins/elements/gstqueue2.h +++ b/plugins/elements/gstqueue2.h @@ -45,6 +45,7 @@ G_BEGIN_DECLS typedef struct _GstQueue2 GstQueue2; typedef struct _GstQueue2Size GstQueue2Size; typedef struct _GstQueue2Class GstQueue2Class; +typedef struct _GstQueue2Range GstQueue2Range; /* used to keep track of sizes (current and max) */ struct _GstQueue2Size @@ -55,6 +56,16 @@ struct _GstQueue2Size guint64 rate_time; }; +struct _GstQueue2Range +{ + GstQueue2Range *next; + + guint64 offset; + guint64 writing_pos; + guint64 reading_pos; + guint64 max_reading_pos; +}; + struct _GstQueue2 { GstElement element; @@ -112,14 +123,13 @@ struct _GstQueue2 gchar *temp_location; gboolean temp_remove; FILE *temp_file; - guint64 writing_pos; - guint64 reading_pos; - guint64 max_reading_pos; + /* list of downloaded areas and the current area */ + GstQueue2Range *ranges; + GstQueue2Range *current; /* we need this to send the first new segment event of the stream * because we can't save it on the file */ gboolean segment_event_received; GstEvent *starting_segment; - }; struct _GstQueue2Class