dashdemux: Use 1 download task per stream

Instead of having a single download task for all streams, this
commit makes each stream have its own download loop, allowing
parallel download of fragments.
This commit is contained in:
Thiago Santos 2013-12-03 16:16:09 -03:00
parent 3bd66d17fc
commit 31e4ba0094
2 changed files with 185 additions and 214 deletions

View file

@ -181,6 +181,9 @@ enum
#define DEFAULT_FAILED_COUNT 3
#define DOWNLOAD_RATE_HISTORY_MAX 3
#define GST_DASH_DEMUX_DOWNLOAD_LOCK(d) g_mutex_lock (&d->download_mutex)
#define GST_DASH_DEMUX_DOWNLOAD_UNLOCK(d) g_mutex_unlock (&d->download_mutex)
/* Custom internal event to signal end of period */
#define GST_EVENT_DASH_EOP GST_EVENT_MAKE_TYPE(81, GST_EVENT_TYPE_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED)
static GstEvent *
@ -210,14 +213,17 @@ static gboolean gst_dash_demux_src_event (GstPad * pad, GstObject * parent,
static gboolean gst_dash_demux_src_query (GstPad * pad, GstObject * parent,
GstQuery * query);
static void gst_dash_demux_stream_loop (GstDashDemux * demux);
static void gst_dash_demux_download_loop (GstDashDemux * demux);
static void gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream);
static void gst_dash_demux_stop (GstDashDemux * demux);
static void gst_dash_demux_resume_stream_task (GstDashDemux * demux);
static void gst_dash_demux_resume_download_task (GstDashDemux * demux);
static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux);
static gboolean gst_dash_demux_select_representations (GstDashDemux * demux);
static gboolean gst_dash_demux_get_next_fragment (GstDashDemux * demux,
GstActiveStream ** stream, GstClockTime * next_ts);
static gboolean
gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
stream);
static GstFlowReturn gst_dash_demux_get_next_fragment (GstDashDemux * demux,
GstDashDemuxStream * stream, GstActiveStream ** active_stream,
GstClockTime * next_ts);
static gboolean gst_dash_demux_advance_period (GstDashDemux * demux);
static void gst_dash_demux_download_wait (GstDashDemux * demux,
GstClockTime time_diff);
@ -255,11 +261,6 @@ gst_dash_demux_dispose (GObject * obj)
demux->stream_task = NULL;
}
if (demux->download_task) {
gst_object_unref (demux->download_task);
g_rec_mutex_clear (&demux->download_task_lock);
demux->download_task = NULL;
}
g_cond_clear (&demux->download_cond);
g_mutex_clear (&demux->download_mutex);
@ -345,11 +346,6 @@ gst_dash_demux_init (GstDashDemux * demux)
demux->max_bitrate = DEFAULT_MAX_BITRATE;
/* Updates task */
g_rec_mutex_init (&demux->download_task_lock);
demux->download_task =
gst_task_new ((GstTaskFunction) gst_dash_demux_download_loop, demux,
NULL);
gst_task_set_lock (demux->download_task, &demux->download_task_lock);
g_cond_init (&demux->download_cond);
g_mutex_init (&demux->download_mutex);
@ -696,11 +692,18 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
continue;
stream = g_new0 (GstDashDemuxStream, 1);
stream->demux = demux;
caps = gst_dash_demux_get_input_caps (demux, active_stream);
stream->queue =
gst_data_queue_new ((GstDataQueueCheckFullFunction) _check_queue_full,
NULL, NULL, demux);
g_rec_mutex_init (&stream->download_task_lock);
stream->download_task =
gst_task_new ((GstTaskFunction) gst_dash_demux_stream_download_loop,
stream, NULL);
gst_task_set_lock (stream->download_task, &stream->download_task_lock);
stream->index = i;
stream->input_caps = caps;
stream->need_header = TRUE;
@ -1014,18 +1017,18 @@ gst_dash_demux_stop (GstDashDemux * demux)
GstDashDemuxStream *stream = iter->data;
gst_data_queue_set_flushing (stream->queue, TRUE);
if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (stream->download_task);
gst_task_stop (stream->download_task);
g_mutex_lock (&demux->download_mutex);
g_cond_signal (&demux->download_cond);
g_mutex_unlock (&demux->download_mutex);
g_rec_mutex_lock (&stream->download_task_lock);
g_rec_mutex_unlock (&stream->download_task_lock);
gst_task_join (stream->download_task);
}
}
if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (demux->download_task);
gst_task_stop (demux->download_task);
g_mutex_lock (&demux->download_mutex);
g_cond_signal (&demux->download_cond);
g_mutex_unlock (&demux->download_mutex);
g_rec_mutex_lock (&demux->download_task_lock);
g_rec_mutex_unlock (&demux->download_task_lock);
gst_task_join (demux->download_task);
}
if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (demux->stream_task);
gst_task_stop (demux->stream_task);
@ -1309,12 +1312,7 @@ flushing:
end_of_manifest:
{
GST_INFO_OBJECT (demux, "Reached end of manifest, sending EOS");
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
gst_pad_push_event (stream->pad, gst_event_new_eos ());
}
GST_INFO_OBJECT (demux, "Stopped streaming task");
GST_INFO_OBJECT (demux, "Reached end of manifest, stopping streaming task");
gst_task_stop (demux->stream_task);
return;
}
@ -1346,6 +1344,12 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream)
g_object_unref (stream->queue);
stream->queue = NULL;
}
if (stream->download_task) {
gst_task_stop (stream->download_task);
gst_task_join (stream->download_task);
gst_object_unref (stream->download_task);
g_rec_mutex_clear (&stream->download_task_lock);
}
g_free (stream);
}
@ -1437,21 +1441,6 @@ gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream)
}
#endif
static gboolean
gst_dash_demux_all_streams_have_data (GstDashDemux * demux)
{
GSList *iter;
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
if (!stream->has_data_queued)
return FALSE;
}
return TRUE;
}
static GstFlowReturn
gst_dash_demux_refresh_mpd (GstDashDemux * demux)
{
@ -1605,7 +1594,7 @@ gst_dash_demux_refresh_mpd (GstDashDemux * demux)
return GST_FLOW_OK;
}
/* gst_dash_demux_download_loop:
/* gst_dash_demux_stream_download_loop:
*
* Loop for the "download' task that fetches fragments based on the
* selected representations.
@ -1633,18 +1622,23 @@ gst_dash_demux_refresh_mpd (GstDashDemux * demux)
* manifest has been reached.
*
*/
void
gst_dash_demux_download_loop (GstDashDemux * demux)
static void
gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
{
GstClockTime fragment_ts = GST_CLOCK_TIME_NONE;
GstActiveStream *fragment_stream = NULL;
GstDashDemux *demux = stream->demux;
GstFlowReturn flow_ret = GST_FLOW_OK;
GST_LOG_OBJECT (demux, "Starting download loop");
GST_LOG_OBJECT (demux, "Starting download loop %p %s:%s", stream,
GST_DEBUG_PAD_NAME (stream->pad));
GST_DASH_DEMUX_DOWNLOAD_LOCK (demux);
if (gst_mpd_client_is_live (demux->client)
&& demux->client->mpd_uri != NULL) {
switch (gst_dash_demux_refresh_mpd (demux)) {
case GST_FLOW_EOS:
GST_DASH_DEMUX_DOWNLOAD_UNLOCK (demux);
goto end_of_manifest;
default:
break;
@ -1654,31 +1648,34 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
GST_DEBUG_OBJECT (demux, "download loop %i", demux->end_of_manifest);
/* try to switch to another set of representations if needed */
if (gst_dash_demux_all_streams_have_data (demux)) {
gst_dash_demux_select_representations (demux);
}
gst_dash_demux_stream_select_representation_unlocked (stream);
GST_DASH_DEMUX_DOWNLOAD_UNLOCK (demux);
/* fetch the next fragment */
while (!gst_dash_demux_get_next_fragment (demux, &fragment_stream,
&fragment_ts)) {
if (demux->end_of_period) {
GST_INFO_OBJECT (demux, "Reached the end of the Period");
/* setup video, audio and subtitle streams, starting from the next Period */
if (!gst_mpd_client_set_period_index (demux->client,
gst_mpd_client_get_period_index (demux->client) + 1)
|| !gst_dash_demux_setup_all_streams (demux)) {
GST_INFO_OBJECT (demux, "Reached the end of the manifest file");
demux->end_of_manifest = TRUE;
gst_task_start (demux->stream_task);
goto end_of_manifest;
}
/* start playing from the first segment of the new period */
gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0);
demux->end_of_period = FALSE;
flow_ret = gst_dash_demux_get_next_fragment (demux, stream, &fragment_stream,
&fragment_ts);
} else if (demux->cancelled) {
goto cancelled;
} else {
switch (flow_ret) {
case GST_FLOW_OK:
break;
case GST_FLOW_EOS:
if (demux->end_of_period) {
GST_INFO_OBJECT (demux, "Reached the end of the Period");
/* setup video, audio and subtitle streams, starting from the next Period */
if (!gst_mpd_client_set_period_index (demux->client,
gst_mpd_client_get_period_index (demux->client) + 1)
|| !gst_dash_demux_setup_all_streams (demux)) {
GST_INFO_OBJECT (demux, "Reached the end of the manifest file");
demux->end_of_manifest = TRUE;
gst_task_start (demux->stream_task);
goto end_of_manifest;
}
/* start playing from the first segment of the new period */
gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0);
demux->end_of_period = FALSE;
}
break;
case GST_FLOW_ERROR:
/* Download failed 'by itself'
* in case this is live, we might be ahead or before playback, where
* segments don't exist (are still being created or were already deleted)
@ -1727,7 +1724,13 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
} else {
goto error_downloading;
}
}
break;
default:
break;
}
if (demux->cancelled) {
goto cancelled;
}
GST_INFO_OBJECT (demux, "Internal buffering : %" G_GUINT64_FORMAT " s",
@ -1741,14 +1744,14 @@ quit:
cancelled:
{
GST_WARNING_OBJECT (demux, "Cancelled, leaving download task");
gst_task_stop (demux->download_task);
gst_task_stop (stream->download_task);
return;
}
end_of_manifest:
{
GST_INFO_OBJECT (demux, "End of manifest, leaving download task");
gst_task_stop (demux->download_task);
gst_task_stop (stream->download_task);
return;
}
@ -1756,7 +1759,7 @@ error_downloading:
{
GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
("Could not fetch the next fragment, leaving download task"), (NULL));
gst_task_stop (demux->download_task);
gst_task_stop (stream->download_task);
return;
}
}
@ -1770,87 +1773,74 @@ gst_dash_demux_resume_stream_task (GstDashDemux * demux)
static void
gst_dash_demux_resume_download_task (GstDashDemux * demux)
{
gst_task_start (demux->download_task);
GSList *iter;
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
gst_task_start (stream->download_task);
}
}
/* gst_dash_demux_select_representations:
/*
* gst_dash_demux_stream_select_representation_unlocked:
*
* Select the most appropriate media representations based on current target
* Select the most appropriate media representation based on current target
* bitrate.
*
* FIXME: all representations are selected against the same bitrate, but
* they will share the same bandwidth. This only works today because the
* audio representations bitrate usage is negligible as compared to the
* video representation one.
*
* Returns TRUE if a new set of representations has been selected
*/
static gboolean
gst_dash_demux_select_representations (GstDashDemux * demux)
gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
stream)
{
GstActiveStream *active_stream = NULL;
GList *rep_list = NULL;
gint new_index;
gboolean ret = FALSE;
GSList *iter;
GstDashDemuxStream *stream;
GstDashDemux *demux = stream->demux;
guint64 bitrate;
guint i = 0;
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client, stream->index);
if (active_stream == NULL)
return FALSE;
GST_MPD_CLIENT_LOCK (demux->client);
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
guint64 bitrate;
stream = iter->data;
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client, stream->index);
if (active_stream == NULL)
/* retrieve representation list */
if (active_stream->cur_adapt_set)
rep_list = active_stream->cur_adapt_set->Representations;
if (!rep_list)
return FALSE;
bitrate =
gst_download_rate_get_current_rate (&stream->dnl_rate) *
demux->bandwidth_usage;
GST_DEBUG_OBJECT (demux, "Trying to change to bitrate: %" G_GUINT64_FORMAT,
bitrate);
/* get representation index with current max_bandwidth */
new_index = gst_mpdparser_get_rep_idx_with_max_bandwidth (rep_list, bitrate);
/* if no representation has the required bandwidth, take the lowest one */
if (new_index == -1)
new_index = gst_mpdparser_get_rep_idx_with_min_bandwidth (rep_list);
if (new_index != active_stream->representation_idx) {
GstRepresentationNode *rep = g_list_nth_data (rep_list, new_index);
GST_INFO_OBJECT (demux, "Changing representation idx: %d %d %u",
stream->index, new_index, rep->bandwidth);
if (gst_mpd_client_setup_representation (demux->client, active_stream, rep)) {
stream->need_header = TRUE;
stream->has_data_queued = FALSE;
GST_INFO_OBJECT (demux, "Switching bitrate to %d",
active_stream->cur_representation->bandwidth);
gst_caps_unref (stream->input_caps);
stream->input_caps = gst_dash_demux_get_input_caps (demux, active_stream);
gst_dash_demux_stream_push_event (stream,
gst_event_new_caps (stream->input_caps));
return TRUE;
} else {
GST_WARNING_OBJECT (demux, "Can not switch representation, aborting...");
return FALSE;
/* retrieve representation list */
if (active_stream->cur_adapt_set)
rep_list = active_stream->cur_adapt_set->Representations;
if (!rep_list)
return FALSE;
bitrate =
gst_download_rate_get_current_rate (&stream->dnl_rate) *
demux->bandwidth_usage;
GST_DEBUG_OBJECT (demux, "Trying to change to bitrate: %" G_GUINT64_FORMAT,
bitrate);
/* get representation index with current max_bandwidth */
new_index =
gst_mpdparser_get_rep_idx_with_max_bandwidth (rep_list, bitrate);
/* if no representation has the required bandwidth, take the lowest one */
if (new_index == -1)
new_index = gst_mpdparser_get_rep_idx_with_min_bandwidth (rep_list);
if (new_index != active_stream->representation_idx) {
GstRepresentationNode *rep = g_list_nth_data (rep_list, new_index);
GST_INFO_OBJECT (demux, "Changing representation idx: %d %d %u",
stream->index, new_index, rep->bandwidth);
if (gst_mpd_client_setup_representation (demux->client, active_stream,
rep)) {
ret = TRUE;
stream->need_header = TRUE;
stream->has_data_queued = FALSE;
GST_INFO_OBJECT (demux, "Switching bitrate to %d",
active_stream->cur_representation->bandwidth);
gst_caps_unref (stream->input_caps);
stream->input_caps =
gst_dash_demux_get_input_caps (demux, active_stream);
gst_dash_demux_stream_push_event (stream,
gst_event_new_caps (stream->input_caps));
} else {
GST_WARNING_OBJECT (demux,
"Can not switch representation, aborting...");
}
}
i++;
}
GST_MPD_CLIENT_UNLOCK (demux->client);
return ret;
return FALSE;
}
static GstBuffer *
@ -2044,7 +2034,8 @@ gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux,
static gboolean
gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
GstDashDemuxStream * demux_stream, guint64 * size_buffer)
GstDashDemuxStream * demux_stream, guint64 * size_buffer,
GstClockTime * download_time)
{
GstActiveStream *active_stream;
GstFragment *download;
@ -2118,6 +2109,7 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
demux_stream->need_header = FALSE;
}
g_get_current_time (&now);
*download_time = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (start));
buffer = gst_buffer_make_writable (buffer);
@ -2149,112 +2141,91 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
* Returns FALSE if an error occured while downloading fragments
*
*/
static gboolean
static GstFlowReturn
gst_dash_demux_get_next_fragment (GstDashDemux * demux,
GstActiveStream ** stream, GstClockTime * selected_ts)
GstDashDemuxStream * stream, GstActiveStream ** active_stream,
GstClockTime * selected_ts)
{
guint64 buffer_size = 0;
GTimeVal now;
GTimeVal start;
GstClockTime diff;
GSList *iter;
gboolean end_of_period = TRUE;
GstDashDemuxStream *selected_stream = NULL;
GstClockTime best_time = GST_CLOCK_TIME_NONE;
GSList *streams;
GstClockTime ts;
g_mutex_lock (&demux->streams_lock);
/* TODO add check */
streams = g_slist_last (demux->next_periods)->data;
g_mutex_unlock (&demux->streams_lock);
if (stream->download_end_of_period)
return GST_FLOW_EOS;
for (iter = streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
GstClockTime ts;
if (stream->download_end_of_period)
continue;
if (gst_mpd_client_get_next_fragment_timestamp (demux->client,
stream->index, &ts)) {
if (ts < best_time || !GST_CLOCK_TIME_IS_VALID (best_time)) {
selected_stream = stream;
best_time = ts;
}
} else {
GstEvent *event = NULL;
GST_INFO_OBJECT (demux,
"This Period doesn't contain more fragments for stream %u",
stream->index);
/* check if this is live and we should wait for more data */
if (gst_mpd_client_is_live (demux->client)
&& demux->client->mpd_node->minimumUpdatePeriod != -1) {
end_of_period = FALSE;
continue;
}
if (gst_mpd_client_has_next_period (demux->client)) {
event = gst_event_new_dash_eop ();
} else {
GST_DEBUG_OBJECT (demux,
"No more fragments or periods for this stream, setting EOS");
event = gst_event_new_eos ();
}
stream->download_end_of_period = TRUE;
gst_dash_demux_stream_push_event (stream, event);
}
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
GST_LOG_OBJECT (demux, "Skipping stream %p %s:%s : not-linked",
stream, GST_DEBUG_PAD_NAME (stream->pad));
return GST_FLOW_NOT_LINKED;
}
if (selected_ts)
*selected_ts = best_time;
if (stream && selected_stream)
*stream =
gst_mpdparser_get_active_stream_by_index (demux->client,
selected_stream->index);
if (gst_mpd_client_get_next_fragment_timestamp (demux->client,
stream->index, &ts)) {
} else {
GstEvent *event = NULL;
GST_INFO_OBJECT (demux,
"This Period doesn't contain more fragments for stream %u",
stream->index);
/* check if this is live and we should wait for more data */
if (gst_mpd_client_is_live (demux->client)
&& demux->client->mpd_node->minimumUpdatePeriod != -1) {
end_of_period = FALSE;
return GST_FLOW_OK; /* TODO wait */
}
if (gst_mpd_client_has_next_period (demux->client)) {
event = gst_event_new_dash_eop ();
} else {
GST_DEBUG_OBJECT (demux,
"No more fragments or periods for this stream, setting EOS");
event = gst_event_new_eos ();
}
stream->download_end_of_period = TRUE;
gst_dash_demux_stream_push_event (stream, event);
}
*active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client, stream->index);
/*
* If this is a live stream, check the segment end time to make sure
* it is available to download
*/
if (selected_stream && gst_mpd_client_is_live (demux->client) &&
if (stream && gst_mpd_client_is_live (demux->client) &&
demux->client->mpd_node->minimumUpdatePeriod != -1) {
gst_dash_demux_wait_for_fragment_to_be_available (demux, *stream);
gst_dash_demux_wait_for_fragment_to_be_available (demux, *active_stream);
}
/* Get the fragment corresponding to each stream index */
if (selected_stream) {
g_get_current_time (&start);
gst_dash_demux_get_next_fragment_for_stream (demux, selected_stream,
&buffer_size);
g_get_current_time (&now);
if (stream) {
gst_dash_demux_get_next_fragment_for_stream (demux, stream, &buffer_size,
&diff);
end_of_period = FALSE;
}
demux->end_of_period = end_of_period;
if (end_of_period)
return FALSE;
return GST_FLOW_EOS;
/* Wake the download task up */
GST_TASK_SIGNAL (demux->download_task);
if (selected_stream) {
if (stream && buffer_size > 0 && diff > 0) {
#ifndef GST_DISABLE_GST_DEBUG
guint64 brate;
#endif
diff = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (start));
gst_download_rate_add_rate (&selected_stream->dnl_rate, buffer_size, diff);
gst_download_rate_add_rate (&stream->dnl_rate, buffer_size, diff);
#ifndef GST_DISABLE_GST_DEBUG
brate = (buffer_size * 8) / ((double) diff / GST_SECOND);
#endif
GST_INFO_OBJECT (demux,
"Stream: %d Download rate = %" G_GUINT64_FORMAT " Kbits/s (%"
G_GUINT64_FORMAT " Ko in %.2f s)", selected_stream->index, brate / 1000,
G_GUINT64_FORMAT " Ko in %.2f s)", stream->index, brate / 1000,
buffer_size / 1024, ((double) diff / GST_SECOND));
}
return TRUE;
return GST_FLOW_OK;
}
static void

View file

@ -94,6 +94,8 @@ struct _GstDashDemuxStream
gboolean has_data_queued;
GstDataQueue *queue;
GstTask *download_task;
GRecMutex download_task_lock;
GstDownloadRate dnl_rate;
};
@ -135,8 +137,6 @@ struct _GstDashDemux
GRecMutex stream_task_lock;
/* Download task */
GstTask *download_task;
GRecMutex download_task_lock;
GMutex download_mutex;
GCond download_cond;
gboolean cancelled;