dashdemux: reimplementing live streams manifest updates

Manifest updates should be done periodically for live streams,
this patch makes the demuxer create a new manifest client for
the new version and transfers the stream position to the new
one, discarding the old one afterwards.
This commit is contained in:
Thiago Santos 2013-02-22 16:40:36 -03:00
parent b01a3729c0
commit 14feeb3ccb
3 changed files with 254 additions and 60 deletions

View file

@ -647,24 +647,19 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
return gst_pad_event_default (pad, event);
}
static gboolean
gst_dash_demux_setup_all_streams (GstDashDemux * demux)
static void
gst_dash_demux_setup_mpdparser_streams (GstDashDemux * demux,
GstMpdClient * client)
{
GList *listLang = NULL;
guint i, nb_audio;
gchar *lang;
GSList *streams = NULL;
GST_MPD_CLIENT_LOCK (demux->client);
/* clean old active stream list, if any */
gst_active_streams_free (demux->client);
if (!gst_mpd_client_setup_streaming (demux->client, GST_STREAM_VIDEO, ""))
if (!gst_mpd_client_setup_streaming (client, GST_STREAM_VIDEO, ""))
GST_INFO_OBJECT (demux, "No video adaptation set found");
nb_audio =
gst_mpdparser_get_list_and_nb_of_audio_language (demux->client,
&listLang);
gst_mpdparser_get_list_and_nb_of_audio_language (client, &listLang);
if (nb_audio == 0)
nb_audio = 1;
GST_INFO_OBJECT (demux, "Number of languages is=%d", nb_audio);
@ -672,16 +667,28 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
for (i = 0; i < nb_audio; i++) {
lang = (gchar *) g_list_nth_data (listLang, i);
GST_INFO ("nb adaptation set: %i",
gst_mpdparser_get_nb_adaptationSet (demux->client));
if (!gst_mpd_client_setup_streaming (demux->client, GST_STREAM_AUDIO, lang))
gst_mpdparser_get_nb_adaptationSet (client));
if (!gst_mpd_client_setup_streaming (client, GST_STREAM_AUDIO, lang))
GST_INFO_OBJECT (demux, "No audio adaptation set found");
if (gst_mpdparser_get_nb_adaptationSet (demux->client) > nb_audio)
if (!gst_mpd_client_setup_streaming (demux->client,
if (gst_mpdparser_get_nb_adaptationSet (client) > nb_audio)
if (!gst_mpd_client_setup_streaming (client,
GST_STREAM_APPLICATION, lang))
GST_INFO_OBJECT (demux, "No application adaptation set found");
}
}
static gboolean
gst_dash_demux_setup_all_streams (GstDashDemux * demux)
{
guint i;
GSList *streams = NULL;
GST_MPD_CLIENT_LOCK (demux->client);
/* clean old active stream list, if any */
gst_active_streams_free (demux->client);
gst_dash_demux_setup_mpdparser_streams (demux, demux->client);
GST_DEBUG_OBJECT (demux, "Creating stream objects");
for (i = 0; i < gst_mpdparser_get_nb_active_stream (demux->client); i++) {
@ -1082,14 +1089,21 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
}
if (G_LIKELY (GST_IS_BUFFER (item->object))) {
GST_LOG_OBJECT (demux, "Buffer with time %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (item->object)));
if (GST_BUFFER_TIMESTAMP (item->object) < best_time) {
GST_DEBUG_OBJECT (demux, "Found new best time: %" GST_TIME_FORMAT " %p",
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (item->object)), item->object);
best_time = GST_BUFFER_TIMESTAMP (item->object);
selected_stream = stream;
} else if (!GST_CLOCK_TIME_IS_VALID (GST_BUFFER_TIMESTAMP (item->object))) {
selected_stream = stream;
GST_DEBUG_OBJECT (demux, "Buffer without timestamp selected %p",
item->object);
break;
}
} else {
GST_DEBUG_OBJECT (demux, "Non buffers have preference %p", item->object);
selected_stream = stream;
break;
}
@ -1343,21 +1357,24 @@ gst_dash_demux_all_streams_have_data (GstDashDemux * demux)
void
gst_dash_demux_download_loop (GstDashDemux * demux)
{
GstClock *clock = gst_element_get_clock (GST_ELEMENT (demux));
gint64 update_period = demux->client->mpd_node->minimumUpdatePeriod;
GST_LOG_OBJECT (demux, "Starting download loop");
if (clock && gst_mpd_client_is_live (demux->client)
if (gst_mpd_client_is_live (demux->client)
&& demux->client->mpd_uri != NULL && update_period != -1) {
GstFragment *download;
GstBuffer *buffer;
GstClockTime duration, now = gst_clock_get_time (clock);
GstClockTime duration, now = gst_util_get_timestamp ();
/* init reference time for manifest file updates */
if (!GST_CLOCK_TIME_IS_VALID (demux->last_manifest_update))
demux->last_manifest_update = now;
GST_DEBUG_OBJECT (demux,
"Next update: %" GST_TIME_FORMAT " now: %" GST_TIME_FORMAT,
GST_TIME_ARGS ((demux->last_manifest_update +
update_period * GST_MSECOND)), GST_TIME_ARGS (now));
/* update the manifest file */
if (now >= demux->last_manifest_update + update_period * GST_MSECOND) {
GST_DEBUG_OBJECT (demux, "Updating manifest file from URL %s",
@ -1370,38 +1387,90 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
"Failed to update the manifest file from URL %s",
demux->client->mpd_uri);
} else {
GstMpdClient *new_client = NULL;
buffer = gst_fragment_get_buffer (download);
g_object_unref (download);
/* parse the manifest file */
if (buffer == NULL) {
GST_WARNING_OBJECT (demux, "Error validating the manifest.");
} else if (!gst_mpd_parse (demux->client,
(gchar *) GST_BUFFER_DATA (buffer), GST_BUFFER_SIZE (buffer))) {
} else {
new_client = gst_mpd_client_new ();
new_client->mpd_uri = g_strdup (demux->client->mpd_uri);
if (!gst_mpd_parse (new_client,
(gchar *) GST_BUFFER_DATA (buffer),
GST_BUFFER_SIZE (buffer))) {
/* In most cases, this will happen if we set a wrong url in the
* source element and we have received the 404 HTML response instead of
* the manifest */
GST_WARNING_OBJECT (demux, "Error parsing the manifest.");
gst_buffer_unref (buffer);
} else {
GstActiveStream *stream;
guint segment_index;
const gchar *period_id;
guint period_idx;
GSList *iter;
/* prepare the new manifest and try to transfer the stream position
* status from the old manifest client */
gst_buffer_unref (buffer);
stream = gst_mpdparser_get_active_stream_by_index (demux->client, 0);
segment_index = gst_mpd_client_get_segment_index (stream);
GST_DEBUG_OBJECT (demux, "Updating manifest");
period_id = gst_mpd_client_get_period_id (demux->client);
period_idx = gst_mpd_client_get_period_index (demux->client);
/* setup video, audio and subtitle streams, starting from current Period */
if (!gst_mpd_client_setup_media_presentation (demux->client) ||
!gst_mpd_client_set_period_index (demux->client,
gst_mpd_client_get_period_index (demux->client))
|| !gst_dash_demux_setup_all_streams (demux)) {
if (!gst_mpd_client_setup_media_presentation (new_client)) {
/* TODO */
}
if (period_idx) {
if (!gst_mpd_client_set_period_id (new_client, period_id)) {
GST_DEBUG_OBJECT (demux,
"Error setting up the updated manifest file");
goto end_of_manifest;
}
/* continue playing from the next segment */
/* FIXME: support multiple streams with different segment duration */
gst_mpd_client_set_segment_index_for_all_streams (demux->client,
segment_index);
} else {
if (!gst_mpd_client_set_period_index (new_client, period_idx)) {
GST_DEBUG_OBJECT (demux,
"Error setting up the updated manifest file");
goto end_of_manifest;
}
}
gst_dash_demux_setup_mpdparser_streams (demux, new_client);
/* update the streams to play from the next segment */
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *demux_stream = iter->data;
GstActiveStream *new_stream;
GstClockTime ts;
new_stream =
gst_mpdparser_get_active_stream_by_index (new_client,
demux_stream->index);
if (!new_stream) {
GST_DEBUG_OBJECT (demux,
"Stream of index %d is missing from manifest update",
demux_stream->index);
goto end_of_manifest;
}
if (gst_mpd_client_get_next_fragment_timestamp (demux->client,
demux_stream->index, &ts)) {
gst_mpd_client_stream_seek (new_client, new_stream, ts);
} else
if (gst_mpd_client_get_last_fragment_timestamp (demux->client,
demux_stream->index, &ts)) {
/* try to set to the old timestamp + 1 */
gst_mpd_client_stream_seek (new_client, new_stream, ts + 1);
}
}
gst_mpd_client_free (demux->client);
demux->client = new_client;
/* Send an updated duration message */
duration =
@ -1418,13 +1487,13 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
GST_DEBUG_OBJECT (demux,
"mediaPresentationDuration unknown, can not send the duration message");
}
demux->last_manifest_update += update_period * GST_MSECOND;
demux->last_manifest_update = gst_util_get_timestamp ();
GST_DEBUG_OBJECT (demux, "Manifest file successfully updated");
}
}
}
}
}
GST_DEBUG_OBJECT (demux, "download loop %i", demux->end_of_manifest);
@ -1736,7 +1805,6 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux)
g_static_mutex_lock (&demux->streams_lock);
/* TODO add check */
streams = g_slist_last (demux->next_periods)->data;
g_static_mutex_unlock (&demux->streams_lock);
for (iter = streams; iter; iter = g_slist_next (iter)) {
@ -1758,6 +1826,14 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux)
GST_INFO_OBJECT (demux,
"This Period doesn't contain more fragments for stream %u",
stream->index);
/* check if this is live and we should wait for more data */
if (gst_mpd_client_is_live (demux->client)
&& demux->client->mpd_node->minimumUpdatePeriod != -1) {
end_of_period = FALSE;
continue;
}
if (gst_mpd_client_has_next_period (demux->client)) {
event = gst_event_new_dash_eop ();
} else {

View file

@ -156,6 +156,9 @@ static GstSegmentListNode *gst_mpdparser_get_segment_list (GstPeriodNode *
Period, GstAdaptationSetNode * AdaptationSet,
GstRepresentationNode * Representation);
/* Segments */
static guint gst_mpd_client_get_segments_counts (GstActiveStream * stream);
/* Memory management */
static void gst_mpdparser_free_mpd_node (GstMPDNode * mpd_node);
static void gst_mpdparser_free_prog_info_node (GstProgramInformationNode *
@ -3188,6 +3191,67 @@ gst_mpd_client_setup_streaming (GstMpdClient * client,
return TRUE;
}
gboolean
gst_mpd_client_stream_seek (GstMpdClient * client, GstActiveStream * stream,
GstClockTime ts)
{
gint segment_idx = 0;
GstMediaSegment *selectedChunk = NULL;
GList *iter;
g_return_val_if_fail (stream != NULL, 0);
GST_MPD_CLIENT_LOCK (client);
for (iter = stream->segments; iter; iter = g_list_next (iter), segment_idx++) {
GstMediaSegment *segment = iter->data;
GST_DEBUG ("Looking at fragment sequence chunk %d", segment_idx);
if (segment->start_time >= ts) {
selectedChunk = segment;
break;
}
}
if (selectedChunk == NULL) {
GST_MPD_CLIENT_UNLOCK (client);
return FALSE;
}
gst_mpd_client_set_segment_index (stream, segment_idx);
GST_MPD_CLIENT_UNLOCK (client);
return TRUE;
}
gboolean
gst_mpd_client_get_last_fragment_timestamp (GstMpdClient * client,
guint stream_idx, GstClockTime * ts)
{
GstActiveStream *stream;
gint segment_idx;
GstMediaSegment *currentChunk;
GST_DEBUG ("Stream index: %i", stream_idx);
stream = g_list_nth_data (client->active_streams, stream_idx);
g_return_val_if_fail (stream != NULL, 0);
GST_MPD_CLIENT_LOCK (client);
segment_idx = gst_mpd_client_get_segments_counts (stream) - 1;
GST_DEBUG ("Looking for fragment sequence chunk %d", segment_idx);
currentChunk =
gst_mpdparser_get_chunk_by_index (client, stream_idx, segment_idx);
if (currentChunk == NULL) {
GST_MPD_CLIENT_UNLOCK (client);
return FALSE;
}
*ts = currentChunk->start_time;
GST_MPD_CLIENT_UNLOCK (client);
return TRUE;
}
gboolean
gst_mpd_client_get_next_fragment_timestamp (GstMpdClient * client,
guint stream_idx, GstClockTime * ts)
@ -3371,6 +3435,32 @@ gst_mpd_client_get_media_presentation_duration (GstMpdClient * client)
return duration;
}
gboolean
gst_mpd_client_set_period_id (GstMpdClient * client, const gchar * period_id)
{
GstStreamPeriod *next_stream_period;
gboolean ret = FALSE;
GList *iter;
g_return_val_if_fail (client != NULL, FALSE);
g_return_val_if_fail (client->periods != NULL, FALSE);
g_return_val_if_fail (period_id != NULL, FALSE);
GST_MPD_CLIENT_LOCK (client);
for (iter = client->periods; iter; iter = g_list_next (iter)) {
next_stream_period = iter->data;
if (next_stream_period->period->id
&& strcmp (next_stream_period->period->id, period_id) == 0) {
ret = TRUE;
break;
}
}
GST_MPD_CLIENT_UNLOCK (client);
return ret;
}
gboolean
gst_mpd_client_set_period_index (GstMpdClient * client, guint period_idx)
{
@ -3404,6 +3494,22 @@ gst_mpd_client_get_period_index (GstMpdClient * client)
return period_idx;
}
const gchar *
gst_mpd_client_get_period_id (GstMpdClient * client)
{
GstStreamPeriod *period;
gchar *period_id = NULL;
g_return_val_if_fail (client != NULL, 0);
GST_MPD_CLIENT_LOCK (client);
period = g_list_nth_data (client->periods, client->period_idx);
if (period && period->period)
period_id = period->period->id;
GST_MPD_CLIENT_UNLOCK (client);
return period_id;
}
gboolean
gst_mpd_client_has_next_period (GstMpdClient * client)
{
@ -3454,6 +3560,14 @@ gst_mpd_client_get_segment_index (GstActiveStream * stream)
return stream->segment_idx;
}
static guint
gst_mpd_client_get_segments_counts (GstActiveStream * stream)
{
g_return_val_if_fail (stream != NULL, 0);
return g_list_length (stream->segments);
}
gboolean
gst_mpd_client_is_live (GstMpdClient * client)
{

View file

@ -471,14 +471,18 @@ gboolean gst_mpd_client_setup_representation (GstMpdClient *client, GstActiveStr
GstClockTime gst_mpd_client_get_current_position (GstMpdClient *client);
GstClockTime gst_mpd_client_get_next_fragment_duration (GstMpdClient * client);
GstClockTime gst_mpd_client_get_media_presentation_duration (GstMpdClient *client);
gboolean gst_mpd_client_get_last_fragment_timestamp (GstMpdClient * client, guint stream_idx, GstClockTime * ts);
gboolean gst_mpd_client_get_next_fragment_timestamp (GstMpdClient * client, guint stream_idx, GstClockTime * ts);
gboolean gst_mpd_client_get_next_fragment (GstMpdClient *client, guint indexStream, gboolean *discontinuity, gchar **uri, GstClockTime *duration, GstClockTime *timestamp);
gboolean gst_mpd_client_get_next_header (GstMpdClient *client, gchar **uri, guint stream_idx);
gboolean gst_mpd_client_is_live (GstMpdClient * client);
gboolean gst_mpd_client_stream_seek (GstMpdClient * client, GstActiveStream * stream, GstClockTime ts);
/* Period selection */
gboolean gst_mpd_client_set_period_index (GstMpdClient *client, guint period_idx);
gboolean gst_mpd_client_set_period_id (GstMpdClient *client, const gchar * period_id);
guint gst_mpd_client_get_period_index (GstMpdClient *client);
const gchar *gst_mpd_client_get_period_id (GstMpdClient *client);
gboolean gst_mpd_client_has_next_period (GstMpdClient *client);
/* Representation selection */