From fbbf713e4535713d664f093a6d27c1b02e532d10 Mon Sep 17 00:00:00 2001 From: "Andre Moreira Magalhaes (andrunko)" Date: Wed, 13 Feb 2013 02:13:23 -0200 Subject: [PATCH] dashdemux: keep a list of streams periods Keep a list of streams per period so that the download loop can keep downloading while the stream loop is still pushing old period's data. --- ext/dash/gstdashdemux.c | 172 ++++++++++++++++++++++++++++++---------- ext/dash/gstdashdemux.h | 6 +- 2 files changed, 133 insertions(+), 45 deletions(-) diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c index 3b04040605..2cea0023f3 100644 --- a/ext/dash/gstdashdemux.c +++ b/ext/dash/gstdashdemux.c @@ -216,14 +216,19 @@ static void gst_dash_demux_resume_download_task (GstDashDemux * demux); static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux); static gboolean gst_dash_demux_select_representations (GstDashDemux * demux); static gboolean gst_dash_demux_get_next_fragment (GstDashDemux * demux); +static gboolean gst_dash_demux_advance_period (GstDashDemux * demux); +static void gst_dash_demux_expose_streams (GstDashDemux * demux); +static void gst_dash_demux_remove_streams (GstDashDemux * demux, + GSList * streams); +static void gst_dash_demux_stream_free (GstDashDemuxStream * stream); static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose); static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux); static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream); static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream); -static void gst_dash_demux_create_pads (GstDashDemux * demux); +static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux); static void _do_init (GType type) @@ -262,13 +267,13 @@ gst_dash_demux_dispose (GObject * obj) if (demux->stream_task) { gst_object_unref (demux->stream_task); - g_static_rec_mutex_free (&demux->stream_lock); + g_static_rec_mutex_free (&demux->stream_task_lock); demux->stream_task = NULL; } if (demux->download_task) { gst_object_unref (demux->download_task); - g_static_rec_mutex_free (&demux->download_lock); + g_static_rec_mutex_free (&demux->download_task_lock); demux->download_task = NULL; } @@ -338,18 +343,20 @@ gst_dash_demux_init (GstDashDemux * demux, GstDashDemuxClass * klass) demux->max_bitrate = DEFAULT_MAX_BITRATE; /* Updates task */ - g_static_rec_mutex_init (&demux->download_lock); + g_static_rec_mutex_init (&demux->download_task_lock); demux->download_task = gst_task_create ((GstTaskFunction) gst_dash_demux_download_loop, demux); - gst_task_set_lock (demux->download_task, &demux->download_lock); + gst_task_set_lock (demux->download_task, &demux->download_task_lock); demux->download_timed_lock = g_mutex_new (); /* Streaming task */ - g_static_rec_mutex_init (&demux->stream_lock); + g_static_rec_mutex_init (&demux->stream_task_lock); demux->stream_task = gst_task_create ((GstTaskFunction) gst_dash_demux_stream_loop, demux); - gst_task_set_lock (demux->stream_task, &demux->stream_lock); + gst_task_set_lock (demux->stream_task, &demux->stream_task_lock); demux->stream_timed_lock = g_mutex_new (); + + g_static_mutex_init (&demux->streams_lock); } static void @@ -528,9 +535,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event) gst_dash_demux_stop (demux); /* Wait for streaming to finish */ - g_static_rec_mutex_lock (&demux->stream_lock); - - //GST_MPD_CLIENT_LOCK (demux->client); + g_static_rec_mutex_lock (&demux->stream_task_lock); /* select the requested Period in the Media Presentation */ target_pos = (GstClockTime) demux->segment.start; @@ -550,11 +555,22 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event) return FALSE; } if (current_period != gst_mpd_client_get_period_index (demux->client)) { + GSList *streams = NULL; + GST_DEBUG_OBJECT (demux, "Seeking to Period %d", current_period); + streams = demux->streams; + demux->streams = NULL; + /* clean old active stream list, if any */ + gst_active_streams_free (demux->client); + /* setup video, audio and subtitle streams, starting from the new Period */ if (!gst_mpd_client_set_period_index (demux->client, current_period) || !gst_dash_demux_setup_all_streams (demux)) return FALSE; + + gst_dash_demux_expose_streams (demux); + + gst_dash_demux_remove_streams (demux, streams); } if (list == NULL) { @@ -614,7 +630,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event) gst_uri_downloader_reset (demux->downloader); gst_dash_demux_resume_download_task (demux); gst_dash_demux_resume_stream_task (demux); - g_static_rec_mutex_unlock (&demux->stream_lock); + g_static_rec_mutex_unlock (&demux->stream_task_lock); } return TRUE; @@ -632,6 +648,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) GList *listLang = NULL; guint i, nb_audio; gchar *lang; + GSList *streams = NULL; GST_MPD_CLIENT_LOCK (demux->client); /* clean old active stream list, if any */ @@ -645,7 +662,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) &listLang); if (nb_audio == 0) nb_audio = 1; - GST_INFO_OBJECT (demux, "Number of language is=%d", nb_audio); + GST_INFO_OBJECT (demux, "Number of languages is=%d", nb_audio); for (i = 0; i < nb_audio; i++) { lang = (gchar *) g_list_nth_data (listLang, i); @@ -686,12 +703,12 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) DOWNLOAD_RATE_HISTORY_MAX); GST_LOG_OBJECT (demux, "Creating stream %d %" GST_PTR_FORMAT, i, caps); - demux->streams = g_slist_prepend (demux->streams, stream); + streams = g_slist_prepend (streams, stream); + stream->pad = gst_dash_demux_create_pad (demux); } - demux->streams = g_slist_reverse (demux->streams); - - gst_dash_demux_create_pads (demux); + streams = g_slist_reverse (streams); + demux->next_periods = g_slist_append (demux->next_periods, streams); GST_MPD_CLIENT_UNLOCK (demux->client); return TRUE; @@ -759,6 +776,8 @@ gst_dash_demux_sink_event (GstPad * pad, GstEvent * event) !gst_dash_demux_setup_all_streams (demux)) return FALSE; + gst_dash_demux_advance_period (demux); + /* start playing from the first segment */ gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0); @@ -845,6 +864,7 @@ gst_dash_demux_src_query (GstPad * pad, GstQuery * query) gst_query_set_seeking (query, fmt, !gst_mpd_client_is_live (dashdemux->client), 0, stop); ret = TRUE; + GST_DEBUG_OBJECT (dashdemux, "GST_QUERY_SEEKING returning with stop : %" GST_TIME_FORMAT, GST_TIME_ARGS (stop)); } @@ -891,15 +911,15 @@ gst_dash_demux_stop (GstDashDemux * demux) if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) { GST_TASK_SIGNAL (demux->download_task); gst_task_stop (demux->download_task); - g_static_rec_mutex_lock (&demux->download_lock); - g_static_rec_mutex_unlock (&demux->download_lock); + g_static_rec_mutex_lock (&demux->download_task_lock); + g_static_rec_mutex_unlock (&demux->download_task_lock); gst_task_join (demux->download_task); } if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { GST_TASK_SIGNAL (demux->stream_task); gst_task_stop (demux->stream_task); - g_static_rec_mutex_lock (&demux->stream_lock); - g_static_rec_mutex_unlock (&demux->stream_lock); + g_static_rec_mutex_lock (&demux->stream_task_lock); + g_static_rec_mutex_unlock (&demux->stream_task_lock); gst_task_join (demux->stream_task); } @@ -911,32 +931,89 @@ gst_dash_demux_stop (GstDashDemux * demux) } } +static GstPad * +gst_dash_demux_create_pad (GstDashDemux * demux) +{ + GstPad *pad; + + /* Create and activate new pads */ + pad = gst_pad_new_from_static_template (&srctemplate, NULL); + gst_pad_set_event_function (pad, + GST_DEBUG_FUNCPTR (gst_dash_demux_src_event)); + gst_pad_set_query_function (pad, + GST_DEBUG_FUNCPTR (gst_dash_demux_src_query)); + gst_pad_set_element_private (pad, demux); + gst_pad_set_active (pad, TRUE); + GST_INFO_OBJECT (demux, "Creating srcpad %s:%s", GST_DEBUG_PAD_NAME (pad)); + return pad; +} + static void -gst_dash_demux_create_pads (GstDashDemux * demux) +gst_dash_demux_expose_streams (GstDashDemux * demux) { GSList *iter; - /* Create and activate new pads */ for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; + GST_LOG_OBJECT (demux, "Exposing stream %d %" GST_PTR_FORMAT, stream->index, + stream->input_caps); g_assert (stream->pad == NULL); - - stream->pad = gst_pad_new_from_static_template (&srctemplate, NULL); - gst_pad_set_event_function (stream->pad, - GST_DEBUG_FUNCPTR (gst_dash_demux_src_event)); - gst_pad_set_query_function (stream->pad, - GST_DEBUG_FUNCPTR (gst_dash_demux_src_query)); - gst_pad_set_element_private (stream->pad, demux); - gst_pad_set_active (stream->pad, TRUE); - GST_INFO_OBJECT (demux, "Adding srcpad %s:%s", - GST_DEBUG_PAD_NAME (stream->pad)); gst_element_add_pad (GST_ELEMENT (demux), gst_object_ref (stream->pad)); } - gst_element_no_more_pads (GST_ELEMENT_CAST (demux)); } +static void +gst_dash_demux_remove_streams (GstDashDemux * demux, GSList * streams) +{ + GSList *iter; + GstEvent *eos = gst_event_new_eos (); + + for (iter = streams; iter; iter = g_slist_next (iter)) { + GstDashDemuxStream *stream = iter->data;; + + GST_LOG_OBJECT (demux, "Removing stream %d %" GST_PTR_FORMAT, stream->index, + stream->input_caps); + g_assert (stream->pad == NULL); + gst_pad_push_event (stream->pad, gst_event_ref (eos)); + gst_pad_set_active (stream->pad, FALSE); + gst_element_remove_pad (GST_ELEMENT (demux), stream->pad); + gst_dash_demux_stream_free (stream); + } + gst_event_unref (eos); + g_slist_free (streams); +} + +static gboolean +gst_dash_demux_advance_period (GstDashDemux * demux) +{ + GSList *old_period = NULL; + g_static_mutex_lock (&demux->streams_lock); + + if (demux->streams) { + g_assert (demux->streams == demux->next_periods->data); + + demux->next_periods = g_slist_remove (demux->next_periods, demux->streams); + old_period = demux->streams; + demux->streams = NULL; + } + + if (demux->next_periods) { + demux->streams = demux->next_periods->data; + } else { + GST_DEBUG_OBJECT (demux, "No next periods, return FALSE"); + g_static_mutex_unlock (&demux->streams_lock); + return FALSE; + } + + gst_dash_demux_expose_streams (demux); + gst_dash_demux_remove_streams (demux, old_period); + + g_static_mutex_unlock (&demux->streams_lock); + return TRUE; +} + /* gst_dash_demux_stream_loop: * * Loop for the "stream' task that pushes fragments to the src pads. @@ -962,7 +1039,6 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) { GstFlowReturn ret; GstActiveStream *active_stream; - guint i = 0; GSList *iter; GstClockTime best_time; GstDashDemuxStream *selected_stream; @@ -973,7 +1049,8 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) best_time = GST_CLOCK_TIME_NONE; selected_stream = NULL; - for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) { + + for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; GstDataQueueItem *item; @@ -1030,8 +1107,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) selected_stream->index); if (demux->need_segment) { /* And send a newsegment */ - for (iter = demux->streams, i = 0; iter; - i++, iter = g_slist_next (iter)) { + for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; gst_pad_push_event (stream->pad, gst_event_new_new_segment (FALSE, demux->segment.rate, @@ -1043,9 +1119,10 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) GST_DEBUG_OBJECT (demux, "Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%" - GST_TIME_FORMAT, buffer, GST_BUFFER_OFFSET (buffer), + GST_TIME_FORMAT " at pad: %s:%s", buffer, GST_BUFFER_OFFSET (buffer), selected_stream->index, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), - GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); + GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)), + GST_DEBUG_PAD_NAME (selected_stream->pad)); ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer)); gst_segment_set_last_stop (&demux->segment, GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buffer)); @@ -1072,7 +1149,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) if (eos) { goto end_of_manifest; } else if (eop) { - /* TODO advance to next period */ + gst_dash_demux_advance_period (demux); } } @@ -1294,7 +1371,7 @@ gst_dash_demux_download_loop (GstDashDemux * demux) gst_buffer_unref (buffer); stream = gst_mpdparser_get_active_stream_by_index (demux->client, 0); segment_index = gst_mpd_client_get_segment_index (stream); - /* setup video, audio and subtitle streams, starting from first Period */ + /* setup video, audio and subtitle streams, starting from current Period */ if (!gst_mpd_client_setup_media_presentation (demux->client) || !gst_mpd_client_set_period_index (demux->client, gst_mpd_client_get_period_index (demux->client)) @@ -1303,7 +1380,7 @@ gst_dash_demux_download_loop (GstDashDemux * demux) "Error setting up the updated manifest file"); goto end_of_manifest; } - /* continue playing from the the next segment */ + /* continue playing from the next segment */ /* FIXME: support multiple streams with different segment duration */ gst_mpd_client_set_segment_index_for_all_streams (demux->client, segment_index); @@ -1635,8 +1712,15 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux) gboolean end_of_period = TRUE; GstDashDemuxStream *selected_stream = NULL; GstClockTime best_time = GST_CLOCK_TIME_NONE; + GSList *streams; - for (iter = demux->streams; iter; iter = g_slist_next (iter)) { + g_static_mutex_lock (&demux->streams_lock); + /* TODO add check */ + streams = g_slist_last (demux->next_periods)->data; + + g_static_mutex_unlock (&demux->streams_lock); + + for (iter = streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; GstClockTime ts; @@ -1658,6 +1742,8 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux) if (gst_mpd_client_has_next_period (demux->client)) { event = gst_event_new_dash_eop (); } else { + GST_DEBUG_OBJECT (demux, + "No more fragments or periods for this stream, setting EOS"); event = gst_event_new_eos (); } stream->download_end_of_period = TRUE; diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h index 2c8257eb14..fb2c790d4c 100644 --- a/ext/dash/gstdashdemux.h +++ b/ext/dash/gstdashdemux.h @@ -107,6 +107,8 @@ struct _GstDashDemux GstPad *sinkpad; GSList *streams; + GSList *next_periods; + GStaticMutex streams_lock; GstSegment segment; gboolean need_segment; @@ -124,12 +126,12 @@ struct _GstDashDemux /* Streaming task */ GstTask *stream_task; - GStaticRecMutex stream_lock; + GStaticRecMutex stream_task_lock; GMutex *stream_timed_lock; /* Download task */ GstTask *download_task; - GStaticRecMutex download_lock; + GStaticRecMutex download_task_lock; gboolean cancelled; GMutex *download_timed_lock;