dashdemux: download the next fragment with smaller timestamp

Instead of downloading 1 fragment per stream per download loop,
select the stream with the earlier timestamp and get a fragment
only for that one.

The old algorithm would lead to problems when the fragment durations
were too different for streams.
This commit is contained in:
Thiago Santos 2013-02-01 02:10:15 -03:00
parent dbd6bde9d1
commit 83c0e1e25d
4 changed files with 131 additions and 57 deletions

View file

@ -233,7 +233,7 @@ static void gst_dash_demux_resume_download_task (GstDashDemux * demux);
static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux);
static gboolean gst_dash_demux_select_representations (GstDashDemux * demux,
guint64 current_bitrate);
static gboolean gst_dash_demux_get_next_fragment_set (GstDashDemux * demux);
static gboolean gst_dash_demux_get_next_fragment (GstDashDemux * demux);
static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux);
@ -1269,6 +1269,21 @@ gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream)
return (GstClockTime) level.time;
}
static gboolean
gst_dash_demux_all_streams_have_data (GstDashDemux * demux)
{
GSList *iter;
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
if (!stream->has_data_queued)
return FALSE;
}
return TRUE;
}
/* gst_dash_demux_download_loop:
*
* Loop for the "download' task that fetches fragments based on the
@ -1384,11 +1399,13 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
GST_DEBUG_OBJECT (demux, "download loop %i", demux->end_of_manifest);
/* try to switch to another set of representations if needed */
gst_dash_demux_select_representations (demux,
demux->bandwidth_usage * demux->dnl_rate);
if (gst_dash_demux_all_streams_have_data (demux)) {
gst_dash_demux_select_representations (demux,
demux->bandwidth_usage * demux->dnl_rate);
}
/* fetch the next fragment */
while (!gst_dash_demux_get_next_fragment_set (demux)) {
while (!gst_dash_demux_get_next_fragment (demux)) {
if (demux->end_of_period) {
GST_INFO_OBJECT (demux, "Reached the end of the Period");
/* setup video, audio and subtitle streams, starting from the next Period */
@ -1482,6 +1499,7 @@ gst_dash_demux_prepare_pad_switch (GstDashDemux * demux)
g_assert (caps != NULL);
gst_dash_demux_stream_push_event (stream,
gst_event_new_dash_event_pad_switch (caps));
stream->has_data_queued = FALSE;
}
}
@ -1672,9 +1690,9 @@ gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream)
}
}
/* gst_dash_demux_get_next_fragment_set:
/* gst_dash_demux_get_next_fragment:
*
* Get the next set of fragments for the current representations.
* Get the next fragments for the stream with the earlier timestamp.
*
* This function uses the generic URI downloader API.
*
@ -1682,11 +1700,10 @@ gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream)
*
*/
static gboolean
gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
gst_dash_demux_get_next_fragment (GstDashDemux * demux)
{
GstActiveStream *active_stream;
GstFragment *download, *header;
GList *fragment_set;
gchar *next_fragment_uri;
GstClockTime duration;
GstClockTime timestamp;
@ -1697,25 +1714,28 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
guint64 size_buffer = 0;
GSList *iter;
gboolean end_of_period = TRUE;
GstDashDemuxStream *selected_stream = NULL;
GstClockTime best_time = GST_CLOCK_TIME_NONE;
g_get_current_time (&start);
fragment_set = NULL;
/* Get the fragment corresponding to each stream index */
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
guint stream_idx = stream->index;
GstBuffer *buffer;
GstClockTime ts;
if (stream->download_end_of_period)
continue;
if (!gst_mpd_client_get_next_fragment (demux->client,
stream_idx, &discont, &next_fragment_uri, &duration, &timestamp)) {
if (gst_mpd_client_get_next_fragment_timestamp (demux->client,
stream->index, &ts)) {
if (ts < best_time || !GST_CLOCK_TIME_IS_VALID (best_time)) {
selected_stream = stream;
best_time = ts;
}
} else {
GstEvent *event = NULL;
GST_INFO_OBJECT (demux,
"This Period doesn't contain more fragments for stream %u",
stream_idx);
stream->index);
if (gst_mpd_client_has_next_period (demux->client)) {
event = gst_event_new_dash_eop ();
} else {
@ -1723,56 +1743,71 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
}
stream->download_end_of_period = TRUE;
gst_dash_demux_stream_push_event (stream, event);
continue;
}
}
GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx);
GST_INFO_OBJECT (demux,
"Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%"
GST_TIME_FORMAT, next_fragment_uri, GST_TIME_ARGS (timestamp),
GST_TIME_ARGS (duration));
g_get_current_time (&start);
/* Get the fragment corresponding to each stream index */
if (selected_stream) {
guint stream_idx = selected_stream->index;
GstBuffer *buffer;
/* got a fragment to fetch, no end of period */
end_of_period = FALSE;
if (gst_mpd_client_get_next_fragment (demux->client,
stream_idx, &discont, &next_fragment_uri, &duration, &timestamp)) {
download = gst_uri_downloader_fetch_uri (demux->downloader,
next_fragment_uri);
g_free (next_fragment_uri);
GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx);
GST_INFO_OBJECT (demux,
"Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%"
GST_TIME_FORMAT, next_fragment_uri, GST_TIME_ARGS (timestamp),
GST_TIME_ARGS (duration));
if (download == NULL)
return FALSE;
/* got a fragment to fetch, no end of period */
end_of_period = FALSE;
buffer = gst_fragment_get_buffer (download);
download = gst_uri_downloader_fetch_uri (demux->downloader,
next_fragment_uri);
g_free (next_fragment_uri);
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
if (stream == NULL) /* TODO unref fragments */
return FALSE;
if (download == NULL)
return FALSE;
if (stream->need_header) {
/* We need to fetch a new header */
if ((header = gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) {
GST_INFO_OBJECT (demux, "Unable to fetch header");
} else {
GstBuffer *header_buffer;
/* Replace fragment with a new one including the header */
buffer = gst_fragment_get_buffer (download);
header_buffer = gst_fragment_get_buffer (header);
buffer = gst_buffer_join (header_buffer, buffer);
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
if (active_stream == NULL) /* TODO unref fragments */
return FALSE;
if (selected_stream->need_header) {
/* We need to fetch a new header */
if ((header =
gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) {
GST_INFO_OBJECT (demux, "Unable to fetch header");
} else {
GstBuffer *header_buffer;
/* Replace fragment with a new one including the header */
header_buffer = gst_fragment_get_buffer (header);
buffer = gst_buffer_join (header_buffer, buffer);
}
selected_stream->need_header = FALSE;
}
stream->need_header = FALSE;
buffer = gst_buffer_make_metadata_writable (buffer);
GST_BUFFER_TIMESTAMP (buffer) = timestamp;
GST_BUFFER_DURATION (buffer) = duration;
GST_BUFFER_OFFSET (buffer) =
gst_mpd_client_get_segment_index (active_stream) - 1;
gst_buffer_set_caps (buffer, selected_stream->input_caps);
gst_dash_demux_stream_push_data (selected_stream, buffer);
selected_stream->has_data_queued = TRUE;
size_buffer += GST_BUFFER_SIZE (buffer);
} else {
GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d",
selected_stream, selected_stream->index);
}
buffer = gst_buffer_make_metadata_writable (buffer);
GST_BUFFER_TIMESTAMP (buffer) = timestamp;
GST_BUFFER_DURATION (buffer) = duration;
GST_BUFFER_OFFSET (buffer) =
gst_mpd_client_get_segment_index (active_stream) - 1;
gst_buffer_set_caps (buffer, stream->input_caps);
gst_dash_demux_stream_push_data (stream, buffer);
size_buffer += GST_BUFFER_SIZE (buffer);
}
demux->end_of_period = end_of_period;

View file

@ -78,11 +78,20 @@ struct _GstDashDemuxStream
gboolean stream_end_of_period;
gboolean stream_eos;
gboolean need_header;
gboolean need_segment;
/* tracks if a stream has enqueued data
* after a pad switch.
* This is required to prevent pads being
* added to the demuxer and having no data
* pushed to it before another pad switch
* as this might make downstream elements
* unhappy and error out if they get
* an EOS without receiving any input
*/
gboolean has_data_queued;
GstDataQueue *queue;
};

View file

@ -3129,6 +3129,35 @@ gst_mpd_client_setup_streaming (GstMpdClient * client,
return TRUE;
}
gboolean
gst_mpd_client_get_next_fragment_timestamp (GstMpdClient * client,
guint stream_idx, GstClockTime * ts)
{
GstActiveStream *stream;
gint segment_idx;
GstMediaSegment *currentChunk;
GST_DEBUG ("Stream index: %i", stream_idx);
stream = g_list_nth_data (client->active_streams, stream_idx);
g_return_val_if_fail (stream != NULL, 0);
GST_MPD_CLIENT_LOCK (client);
segment_idx = gst_mpd_client_get_segment_index (stream);
GST_DEBUG ("Looking for fragment sequence chunk %d", segment_idx);
currentChunk =
gst_mpdparser_get_chunk_by_index (client, stream_idx, segment_idx);
if (currentChunk == NULL) {
GST_MPD_CLIENT_UNLOCK (client);
return FALSE;
}
*ts = currentChunk->start_time;
GST_MPD_CLIENT_UNLOCK (client);
return TRUE;
}
gboolean
gst_mpd_client_get_next_fragment (GstMpdClient * client,
guint indexStream, gboolean * discontinuity, gchar ** uri,

View file

@ -470,6 +470,7 @@ gboolean gst_mpd_client_setup_representation (GstMpdClient *client, GstActiveStr
GstClockTime gst_mpd_client_get_current_position (GstMpdClient *client);
GstClockTime gst_mpd_client_get_next_fragment_duration (GstMpdClient * client);
GstClockTime gst_mpd_client_get_media_presentation_duration (GstMpdClient *client);
gboolean gst_mpd_client_get_next_fragment_timestamp (GstMpdClient * client, guint stream_idx, GstClockTime * ts);
gboolean gst_mpd_client_get_next_fragment (GstMpdClient *client, guint indexStream, gboolean *discontinuity, gchar **uri, GstClockTime *duration, GstClockTime *timestamp);
gboolean gst_mpd_client_get_next_header (GstMpdClient *client, const gchar **uri, guint stream_idx);
gboolean gst_mpd_client_is_live (GstMpdClient * client);