dashdemux: keep a list of streams periods

Keep a list of streams per period so that the download loop can keep
downloading while the stream loop is still pushing old period's data.
This commit is contained in:
Andre Moreira Magalhaes (andrunko) 2013-02-13 02:13:23 -02:00 committed by Thiago Santos
parent b9272dc0e6
commit fbbf713e45
2 changed files with 133 additions and 45 deletions

View file

@ -216,14 +216,19 @@ static void gst_dash_demux_resume_download_task (GstDashDemux * demux);
static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux); static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux);
static gboolean gst_dash_demux_select_representations (GstDashDemux * demux); static gboolean gst_dash_demux_select_representations (GstDashDemux * demux);
static gboolean gst_dash_demux_get_next_fragment (GstDashDemux * demux); static gboolean gst_dash_demux_get_next_fragment (GstDashDemux * demux);
static gboolean gst_dash_demux_advance_period (GstDashDemux * demux);
static void gst_dash_demux_expose_streams (GstDashDemux * demux);
static void gst_dash_demux_remove_streams (GstDashDemux * demux,
GSList * streams);
static void gst_dash_demux_stream_free (GstDashDemuxStream * stream);
static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose); static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux); static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux);
static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux, static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
GstActiveStream * stream); GstActiveStream * stream);
static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream
* stream); * stream);
static void gst_dash_demux_create_pads (GstDashDemux * demux); static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux);
static void static void
_do_init (GType type) _do_init (GType type)
@ -262,13 +267,13 @@ gst_dash_demux_dispose (GObject * obj)
if (demux->stream_task) { if (demux->stream_task) {
gst_object_unref (demux->stream_task); gst_object_unref (demux->stream_task);
g_static_rec_mutex_free (&demux->stream_lock); g_static_rec_mutex_free (&demux->stream_task_lock);
demux->stream_task = NULL; demux->stream_task = NULL;
} }
if (demux->download_task) { if (demux->download_task) {
gst_object_unref (demux->download_task); gst_object_unref (demux->download_task);
g_static_rec_mutex_free (&demux->download_lock); g_static_rec_mutex_free (&demux->download_task_lock);
demux->download_task = NULL; demux->download_task = NULL;
} }
@ -338,18 +343,20 @@ gst_dash_demux_init (GstDashDemux * demux, GstDashDemuxClass * klass)
demux->max_bitrate = DEFAULT_MAX_BITRATE; demux->max_bitrate = DEFAULT_MAX_BITRATE;
/* Updates task */ /* Updates task */
g_static_rec_mutex_init (&demux->download_lock); g_static_rec_mutex_init (&demux->download_task_lock);
demux->download_task = demux->download_task =
gst_task_create ((GstTaskFunction) gst_dash_demux_download_loop, demux); gst_task_create ((GstTaskFunction) gst_dash_demux_download_loop, demux);
gst_task_set_lock (demux->download_task, &demux->download_lock); gst_task_set_lock (demux->download_task, &demux->download_task_lock);
demux->download_timed_lock = g_mutex_new (); demux->download_timed_lock = g_mutex_new ();
/* Streaming task */ /* Streaming task */
g_static_rec_mutex_init (&demux->stream_lock); g_static_rec_mutex_init (&demux->stream_task_lock);
demux->stream_task = demux->stream_task =
gst_task_create ((GstTaskFunction) gst_dash_demux_stream_loop, demux); gst_task_create ((GstTaskFunction) gst_dash_demux_stream_loop, demux);
gst_task_set_lock (demux->stream_task, &demux->stream_lock); gst_task_set_lock (demux->stream_task, &demux->stream_task_lock);
demux->stream_timed_lock = g_mutex_new (); demux->stream_timed_lock = g_mutex_new ();
g_static_mutex_init (&demux->streams_lock);
} }
static void static void
@ -528,9 +535,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
gst_dash_demux_stop (demux); gst_dash_demux_stop (demux);
/* Wait for streaming to finish */ /* Wait for streaming to finish */
g_static_rec_mutex_lock (&demux->stream_lock); g_static_rec_mutex_lock (&demux->stream_task_lock);
//GST_MPD_CLIENT_LOCK (demux->client);
/* select the requested Period in the Media Presentation */ /* select the requested Period in the Media Presentation */
target_pos = (GstClockTime) demux->segment.start; target_pos = (GstClockTime) demux->segment.start;
@ -550,11 +555,22 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
return FALSE; return FALSE;
} }
if (current_period != gst_mpd_client_get_period_index (demux->client)) { if (current_period != gst_mpd_client_get_period_index (demux->client)) {
GSList *streams = NULL;
GST_DEBUG_OBJECT (demux, "Seeking to Period %d", current_period); GST_DEBUG_OBJECT (demux, "Seeking to Period %d", current_period);
streams = demux->streams;
demux->streams = NULL;
/* clean old active stream list, if any */
gst_active_streams_free (demux->client);
/* setup video, audio and subtitle streams, starting from the new Period */ /* setup video, audio and subtitle streams, starting from the new Period */
if (!gst_mpd_client_set_period_index (demux->client, current_period) if (!gst_mpd_client_set_period_index (demux->client, current_period)
|| !gst_dash_demux_setup_all_streams (demux)) || !gst_dash_demux_setup_all_streams (demux))
return FALSE; return FALSE;
gst_dash_demux_expose_streams (demux);
gst_dash_demux_remove_streams (demux, streams);
} }
if (list == NULL) { if (list == NULL) {
@ -614,7 +630,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
gst_uri_downloader_reset (demux->downloader); gst_uri_downloader_reset (demux->downloader);
gst_dash_demux_resume_download_task (demux); gst_dash_demux_resume_download_task (demux);
gst_dash_demux_resume_stream_task (demux); gst_dash_demux_resume_stream_task (demux);
g_static_rec_mutex_unlock (&demux->stream_lock); g_static_rec_mutex_unlock (&demux->stream_task_lock);
} }
return TRUE; return TRUE;
@ -632,6 +648,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
GList *listLang = NULL; GList *listLang = NULL;
guint i, nb_audio; guint i, nb_audio;
gchar *lang; gchar *lang;
GSList *streams = NULL;
GST_MPD_CLIENT_LOCK (demux->client); GST_MPD_CLIENT_LOCK (demux->client);
/* clean old active stream list, if any */ /* clean old active stream list, if any */
@ -645,7 +662,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
&listLang); &listLang);
if (nb_audio == 0) if (nb_audio == 0)
nb_audio = 1; nb_audio = 1;
GST_INFO_OBJECT (demux, "Number of language is=%d", nb_audio); GST_INFO_OBJECT (demux, "Number of languages is=%d", nb_audio);
for (i = 0; i < nb_audio; i++) { for (i = 0; i < nb_audio; i++) {
lang = (gchar *) g_list_nth_data (listLang, i); lang = (gchar *) g_list_nth_data (listLang, i);
@ -686,12 +703,12 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
DOWNLOAD_RATE_HISTORY_MAX); DOWNLOAD_RATE_HISTORY_MAX);
GST_LOG_OBJECT (demux, "Creating stream %d %" GST_PTR_FORMAT, i, caps); GST_LOG_OBJECT (demux, "Creating stream %d %" GST_PTR_FORMAT, i, caps);
demux->streams = g_slist_prepend (demux->streams, stream); streams = g_slist_prepend (streams, stream);
stream->pad = gst_dash_demux_create_pad (demux);
} }
demux->streams = g_slist_reverse (demux->streams); streams = g_slist_reverse (streams);
gst_dash_demux_create_pads (demux);
demux->next_periods = g_slist_append (demux->next_periods, streams);
GST_MPD_CLIENT_UNLOCK (demux->client); GST_MPD_CLIENT_UNLOCK (demux->client);
return TRUE; return TRUE;
@ -759,6 +776,8 @@ gst_dash_demux_sink_event (GstPad * pad, GstEvent * event)
!gst_dash_demux_setup_all_streams (demux)) !gst_dash_demux_setup_all_streams (demux))
return FALSE; return FALSE;
gst_dash_demux_advance_period (demux);
/* start playing from the first segment */ /* start playing from the first segment */
gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0); gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0);
@ -845,6 +864,7 @@ gst_dash_demux_src_query (GstPad * pad, GstQuery * query)
gst_query_set_seeking (query, fmt, gst_query_set_seeking (query, fmt,
!gst_mpd_client_is_live (dashdemux->client), 0, stop); !gst_mpd_client_is_live (dashdemux->client), 0, stop);
ret = TRUE; ret = TRUE;
GST_DEBUG_OBJECT (dashdemux, "GST_QUERY_SEEKING returning with stop : %" GST_DEBUG_OBJECT (dashdemux, "GST_QUERY_SEEKING returning with stop : %"
GST_TIME_FORMAT, GST_TIME_ARGS (stop)); GST_TIME_FORMAT, GST_TIME_ARGS (stop));
} }
@ -891,15 +911,15 @@ gst_dash_demux_stop (GstDashDemux * demux)
if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) { if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (demux->download_task); GST_TASK_SIGNAL (demux->download_task);
gst_task_stop (demux->download_task); gst_task_stop (demux->download_task);
g_static_rec_mutex_lock (&demux->download_lock); g_static_rec_mutex_lock (&demux->download_task_lock);
g_static_rec_mutex_unlock (&demux->download_lock); g_static_rec_mutex_unlock (&demux->download_task_lock);
gst_task_join (demux->download_task); gst_task_join (demux->download_task);
} }
if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (demux->stream_task); GST_TASK_SIGNAL (demux->stream_task);
gst_task_stop (demux->stream_task); gst_task_stop (demux->stream_task);
g_static_rec_mutex_lock (&demux->stream_lock); g_static_rec_mutex_lock (&demux->stream_task_lock);
g_static_rec_mutex_unlock (&demux->stream_lock); g_static_rec_mutex_unlock (&demux->stream_task_lock);
gst_task_join (demux->stream_task); gst_task_join (demux->stream_task);
} }
@ -911,32 +931,89 @@ gst_dash_demux_stop (GstDashDemux * demux)
} }
} }
static GstPad *
gst_dash_demux_create_pad (GstDashDemux * demux)
{
GstPad *pad;
/* Create and activate new pads */
pad = gst_pad_new_from_static_template (&srctemplate, NULL);
gst_pad_set_event_function (pad,
GST_DEBUG_FUNCPTR (gst_dash_demux_src_event));
gst_pad_set_query_function (pad,
GST_DEBUG_FUNCPTR (gst_dash_demux_src_query));
gst_pad_set_element_private (pad, demux);
gst_pad_set_active (pad, TRUE);
GST_INFO_OBJECT (demux, "Creating srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
return pad;
}
static void static void
gst_dash_demux_create_pads (GstDashDemux * demux) gst_dash_demux_expose_streams (GstDashDemux * demux)
{ {
GSList *iter; GSList *iter;
/* Create and activate new pads */
for (iter = demux->streams; iter; iter = g_slist_next (iter)) { for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data; GstDashDemuxStream *stream = iter->data;
GST_LOG_OBJECT (demux, "Exposing stream %d %" GST_PTR_FORMAT, stream->index,
stream->input_caps);
g_assert (stream->pad == NULL); g_assert (stream->pad == NULL);
stream->pad = gst_pad_new_from_static_template (&srctemplate, NULL);
gst_pad_set_event_function (stream->pad,
GST_DEBUG_FUNCPTR (gst_dash_demux_src_event));
gst_pad_set_query_function (stream->pad,
GST_DEBUG_FUNCPTR (gst_dash_demux_src_query));
gst_pad_set_element_private (stream->pad, demux);
gst_pad_set_active (stream->pad, TRUE);
GST_INFO_OBJECT (demux, "Adding srcpad %s:%s",
GST_DEBUG_PAD_NAME (stream->pad));
gst_element_add_pad (GST_ELEMENT (demux), gst_object_ref (stream->pad)); gst_element_add_pad (GST_ELEMENT (demux), gst_object_ref (stream->pad));
} }
gst_element_no_more_pads (GST_ELEMENT_CAST (demux)); gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
} }
static void
gst_dash_demux_remove_streams (GstDashDemux * demux, GSList * streams)
{
GSList *iter;
GstEvent *eos = gst_event_new_eos ();
for (iter = streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;;
GST_LOG_OBJECT (demux, "Removing stream %d %" GST_PTR_FORMAT, stream->index,
stream->input_caps);
g_assert (stream->pad == NULL);
gst_pad_push_event (stream->pad, gst_event_ref (eos));
gst_pad_set_active (stream->pad, FALSE);
gst_element_remove_pad (GST_ELEMENT (demux), stream->pad);
gst_dash_demux_stream_free (stream);
}
gst_event_unref (eos);
g_slist_free (streams);
}
static gboolean
gst_dash_demux_advance_period (GstDashDemux * demux)
{
GSList *old_period = NULL;
g_static_mutex_lock (&demux->streams_lock);
if (demux->streams) {
g_assert (demux->streams == demux->next_periods->data);
demux->next_periods = g_slist_remove (demux->next_periods, demux->streams);
old_period = demux->streams;
demux->streams = NULL;
}
if (demux->next_periods) {
demux->streams = demux->next_periods->data;
} else {
GST_DEBUG_OBJECT (demux, "No next periods, return FALSE");
g_static_mutex_unlock (&demux->streams_lock);
return FALSE;
}
gst_dash_demux_expose_streams (demux);
gst_dash_demux_remove_streams (demux, old_period);
g_static_mutex_unlock (&demux->streams_lock);
return TRUE;
}
/* gst_dash_demux_stream_loop: /* gst_dash_demux_stream_loop:
* *
* Loop for the "stream' task that pushes fragments to the src pads. * Loop for the "stream' task that pushes fragments to the src pads.
@ -962,7 +1039,6 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
{ {
GstFlowReturn ret; GstFlowReturn ret;
GstActiveStream *active_stream; GstActiveStream *active_stream;
guint i = 0;
GSList *iter; GSList *iter;
GstClockTime best_time; GstClockTime best_time;
GstDashDemuxStream *selected_stream; GstDashDemuxStream *selected_stream;
@ -973,7 +1049,8 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
best_time = GST_CLOCK_TIME_NONE; best_time = GST_CLOCK_TIME_NONE;
selected_stream = NULL; selected_stream = NULL;
for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) {
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data; GstDashDemuxStream *stream = iter->data;
GstDataQueueItem *item; GstDataQueueItem *item;
@ -1030,8 +1107,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
selected_stream->index); selected_stream->index);
if (demux->need_segment) { if (demux->need_segment) {
/* And send a newsegment */ /* And send a newsegment */
for (iter = demux->streams, i = 0; iter; for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
i++, iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data; GstDashDemuxStream *stream = iter->data;
gst_pad_push_event (stream->pad, gst_pad_push_event (stream->pad,
gst_event_new_new_segment (FALSE, demux->segment.rate, gst_event_new_new_segment (FALSE, demux->segment.rate,
@ -1043,9 +1119,10 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
GST_DEBUG_OBJECT (demux, GST_DEBUG_OBJECT (demux,
"Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%" "Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%"
GST_TIME_FORMAT, buffer, GST_BUFFER_OFFSET (buffer), GST_TIME_FORMAT " at pad: %s:%s", buffer, GST_BUFFER_OFFSET (buffer),
selected_stream->index, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), selected_stream->index, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
GST_DEBUG_PAD_NAME (selected_stream->pad));
ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer)); ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer));
gst_segment_set_last_stop (&demux->segment, GST_FORMAT_TIME, gst_segment_set_last_stop (&demux->segment, GST_FORMAT_TIME,
GST_BUFFER_TIMESTAMP (buffer)); GST_BUFFER_TIMESTAMP (buffer));
@ -1072,7 +1149,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
if (eos) { if (eos) {
goto end_of_manifest; goto end_of_manifest;
} else if (eop) { } else if (eop) {
/* TODO advance to next period */ gst_dash_demux_advance_period (demux);
} }
} }
@ -1294,7 +1371,7 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
stream = gst_mpdparser_get_active_stream_by_index (demux->client, 0); stream = gst_mpdparser_get_active_stream_by_index (demux->client, 0);
segment_index = gst_mpd_client_get_segment_index (stream); segment_index = gst_mpd_client_get_segment_index (stream);
/* setup video, audio and subtitle streams, starting from first Period */ /* setup video, audio and subtitle streams, starting from current Period */
if (!gst_mpd_client_setup_media_presentation (demux->client) || if (!gst_mpd_client_setup_media_presentation (demux->client) ||
!gst_mpd_client_set_period_index (demux->client, !gst_mpd_client_set_period_index (demux->client,
gst_mpd_client_get_period_index (demux->client)) gst_mpd_client_get_period_index (demux->client))
@ -1303,7 +1380,7 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
"Error setting up the updated manifest file"); "Error setting up the updated manifest file");
goto end_of_manifest; goto end_of_manifest;
} }
/* continue playing from the the next segment */ /* continue playing from the next segment */
/* FIXME: support multiple streams with different segment duration */ /* FIXME: support multiple streams with different segment duration */
gst_mpd_client_set_segment_index_for_all_streams (demux->client, gst_mpd_client_set_segment_index_for_all_streams (demux->client,
segment_index); segment_index);
@ -1635,8 +1712,15 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux)
gboolean end_of_period = TRUE; gboolean end_of_period = TRUE;
GstDashDemuxStream *selected_stream = NULL; GstDashDemuxStream *selected_stream = NULL;
GstClockTime best_time = GST_CLOCK_TIME_NONE; GstClockTime best_time = GST_CLOCK_TIME_NONE;
GSList *streams;
for (iter = demux->streams; iter; iter = g_slist_next (iter)) { g_static_mutex_lock (&demux->streams_lock);
/* TODO add check */
streams = g_slist_last (demux->next_periods)->data;
g_static_mutex_unlock (&demux->streams_lock);
for (iter = streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data; GstDashDemuxStream *stream = iter->data;
GstClockTime ts; GstClockTime ts;
@ -1658,6 +1742,8 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux)
if (gst_mpd_client_has_next_period (demux->client)) { if (gst_mpd_client_has_next_period (demux->client)) {
event = gst_event_new_dash_eop (); event = gst_event_new_dash_eop ();
} else { } else {
GST_DEBUG_OBJECT (demux,
"No more fragments or periods for this stream, setting EOS");
event = gst_event_new_eos (); event = gst_event_new_eos ();
} }
stream->download_end_of_period = TRUE; stream->download_end_of_period = TRUE;

View file

@ -107,6 +107,8 @@ struct _GstDashDemux
GstPad *sinkpad; GstPad *sinkpad;
GSList *streams; GSList *streams;
GSList *next_periods;
GStaticMutex streams_lock;
GstSegment segment; GstSegment segment;
gboolean need_segment; gboolean need_segment;
@ -124,12 +126,12 @@ struct _GstDashDemux
/* Streaming task */ /* Streaming task */
GstTask *stream_task; GstTask *stream_task;
GStaticRecMutex stream_lock; GStaticRecMutex stream_task_lock;
GMutex *stream_timed_lock; GMutex *stream_timed_lock;
/* Download task */ /* Download task */
GstTask *download_task; GstTask *download_task;
GStaticRecMutex download_lock; GStaticRecMutex download_task_lock;
gboolean cancelled; gboolean cancelled;
GMutex *download_timed_lock; GMutex *download_timed_lock;