dashdemux: handle live playback resync

During a live stream it is possible for dashdemux to lag behind on a
slow connection or to rush ahead of the connection os too fast.

For the first case it is necessary to jump some segments ahead to be able to
continue playback as old segments are usually deleted from the server.

For the later, dashdemux should wait a little before attempting another
download do give time to the server to produce a new segment
This commit is contained in:
Thiago Santos 2013-07-08 23:24:28 -03:00
parent b316d8a677
commit 5a5e66ec90
4 changed files with 162 additions and 4 deletions

View file

@ -216,8 +216,11 @@ static void gst_dash_demux_resume_stream_task (GstDashDemux * demux);
static void gst_dash_demux_resume_download_task (GstDashDemux * demux);
static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux);
static gboolean gst_dash_demux_select_representations (GstDashDemux * demux);
static gboolean gst_dash_demux_get_next_fragment (GstDashDemux * demux);
static gboolean gst_dash_demux_get_next_fragment (GstDashDemux * demux,
GstActiveStream ** stream, GstClockTime * next_ts);
static gboolean gst_dash_demux_advance_period (GstDashDemux * demux);
static void gst_dash_demux_download_wait (GstDashDemux * demux,
GstClockTime time_diff);
static void gst_dash_demux_expose_streams (GstDashDemux * demux);
static void gst_dash_demux_remove_streams (GstDashDemux * demux,
@ -254,6 +257,8 @@ gst_dash_demux_dispose (GObject * obj)
g_rec_mutex_clear (&demux->download_task_lock);
demux->download_task = NULL;
}
g_cond_clear (&demux->download_cond);
g_mutex_clear (&demux->download_mutex);
if (demux->downloader != NULL) {
g_object_unref (demux->downloader);
@ -342,6 +347,8 @@ gst_dash_demux_init (GstDashDemux * demux)
gst_task_new ((GstTaskFunction) gst_dash_demux_download_loop, demux,
NULL);
gst_task_set_lock (demux->download_task, &demux->download_task_lock);
g_cond_init (&demux->download_cond);
g_mutex_init (&demux->download_mutex);
/* Streaming task */
g_rec_mutex_init (&demux->stream_task_lock);
@ -998,6 +1005,9 @@ gst_dash_demux_stop (GstDashDemux * demux)
if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (demux->download_task);
gst_task_stop (demux->download_task);
g_mutex_lock (&demux->download_mutex);
g_cond_signal (&demux->download_cond);
g_mutex_unlock (&demux->download_mutex);
g_rec_mutex_lock (&demux->download_task_lock);
g_rec_mutex_unlock (&demux->download_task_lock);
gst_task_join (demux->download_task);
@ -1457,6 +1467,8 @@ void
gst_dash_demux_download_loop (GstDashDemux * demux)
{
gint64 update_period = demux->client->mpd_node->minimumUpdatePeriod;
GstClockTime fragment_ts = GST_CLOCK_TIME_NONE;
GstActiveStream *fragment_stream = NULL;
GST_LOG_OBJECT (demux, "Starting download loop");
@ -1604,7 +1616,8 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
}
/* fetch the next fragment */
while (!gst_dash_demux_get_next_fragment (demux)) {
while (!gst_dash_demux_get_next_fragment (demux, &fragment_stream,
&fragment_ts)) {
if (demux->end_of_period) {
GST_INFO_OBJECT (demux, "Reached the end of the Period");
/* setup video, audio and subtitle streams, starting from the next Period */
@ -1620,7 +1633,43 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0);
demux->end_of_period = FALSE;
} else if (!demux->cancelled) {
demux->client->update_failed_count++;
/* in case this is live, we might be ahead or before playback, so we
* either wait or jump ahead */
if (gst_mpd_client_is_live (demux->client)) {
gint64 time_diff;
gint pos;
pos =
gst_mpd_client_check_time_position (demux->client, fragment_stream,
fragment_ts, &time_diff);
GST_DEBUG_OBJECT (demux,
"Checked position for fragment ts %" GST_TIME_FORMAT
", res: %d, diff: %" G_GINT64_FORMAT, GST_TIME_ARGS (fragment_ts),
pos, time_diff);
time_diff *= GST_USECOND;
if (pos < 0) {
/* we're behind, try moving to the 'present' */
GDateTime *now = g_date_time_new_now_utc ();
GST_DEBUG_OBJECT (demux,
"Falling behind live stream, moving forward");
gst_mpd_client_seek_to_time (demux->client, now);
g_date_time_unref (now);
} else if (pos > 0) {
/* we're ahead, wait a little */
gst_mpd_client_set_segment_index (fragment_stream,
fragment_stream->segment_idx - 1);
gst_dash_demux_download_wait (demux, time_diff);
} else {
gst_mpd_client_set_segment_index (fragment_stream,
fragment_stream->segment_idx - 1);
demux->client->update_failed_count++;
}
} else {
demux->client->update_failed_count++;
}
if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
GST_WARNING_OBJECT (demux, "Could not fetch the next fragment");
goto quit;
@ -1920,6 +1969,8 @@ gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream)
/* gst_dash_demux_get_next_fragment:
*
* Get the next fragments for the stream with the earlier timestamp.
* It returns the selected timestamp so the caller can deal with
* sync issues in case the stream is live.
*
* This function uses the generic URI downloader API.
*
@ -1927,7 +1978,8 @@ gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream)
*
*/
static gboolean
gst_dash_demux_get_next_fragment (GstDashDemux * demux)
gst_dash_demux_get_next_fragment (GstDashDemux * demux,
GstActiveStream ** stream, GstClockTime * selected_ts)
{
GstActiveStream *active_stream;
GstFragment *download;
@ -1985,6 +2037,12 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux)
gst_dash_demux_stream_push_event (stream, event);
}
}
if (selected_ts)
*selected_ts = best_time;
if (stream && selected_stream)
*stream =
gst_mpdparser_get_active_stream_by_index (demux->client,
selected_stream->index);
/* Get the fragment corresponding to each stream index */
if (selected_stream) {
@ -2097,3 +2155,14 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux)
}
return TRUE;
}
static void
gst_dash_demux_download_wait (GstDashDemux * demux, GstClockTime time_diff)
{
gint64 end_time = g_get_monotonic_time () + time_diff / GST_USECOND;
GST_DEBUG_OBJECT (demux, "Download waiting for %" GST_TIME_FORMAT,
GST_TIME_ARGS (time_diff));
g_cond_wait_until (&demux->download_cond, &demux->download_mutex, end_time);
GST_DEBUG_OBJECT (demux, "Download finished waiting");
}

View file

@ -132,6 +132,8 @@ struct _GstDashDemux
/* Download task */
GstTask *download_task;
GRecMutex download_task_lock;
GMutex download_mutex;
GCond download_cond;
gboolean cancelled;
/* Manifest update */

View file

@ -4284,6 +4284,91 @@ gst_mpdparser_get_list_and_nb_of_audio_language (GstMpdClient * client,
return nb_adapatation_set;
}
gint
gst_mpd_client_check_time_position (GstMpdClient * client,
GstActiveStream * stream, GstClockTime ts, gint64 * diff)
{
GDateTime *now = g_date_time_new_now_utc ();
GDateTime *start =
gst_date_time_to_g_date_time (client->mpd_node->availabilityStartTime);
GTimeSpan stream_now;
GTimeSpan ts_microseconds;
GstClockTime duration;
g_return_val_if_fail (gst_mpd_client_is_live (client), 0);
duration = gst_mpd_client_get_segment_duration (client, stream);
stream_now = g_date_time_difference (now, start);
g_date_time_unref (now);
g_date_time_unref (start);
/* sum duration to check if the segment is fully ready */
ts_microseconds = (ts + duration) / GST_USECOND;
/*
* This functions checks if a given ts is in the 'available range' of
* a DASH presentation. This only makes sense for live streams, which
* are continuously adding new segments and removing old ones.
*
* Note: Both the client and the server should use UTC as a time reference.
*
* @ts is the time since the beginning of the stream and we need to find out
* if it is currently available. The server should be hosting segments
*
* * ---------------- ... --- * ----------- * ---- ...
* |
* | past(unavailable) | | available | future(unavailable yet)
* |
* * ---------------- ... --- * ----------- * ---- ...
* | | |
* availabilitStartTime | UTC now
* UTC now - timeShiftBufferDepth
*
* This function should return 0 if @ts is in the 'available' area, 1 for
* 'future' and '-1' for past and the corresponding distance to the
* 'available' area is set to @diff
*
* TODO untested with live presentations with multiple periods as no
* examples for it could be found/generated
*/
if (ts_microseconds > stream_now) {
*diff = ts_microseconds - stream_now;
return 1;
}
if (client->mpd_node->timeShiftBufferDepth
&& ts_microseconds <
stream_now - client->mpd_node->timeShiftBufferDepth) {
*diff = ts_microseconds - stream_now;
return -1;
}
*diff = 0;
return 0;
}
gboolean
gst_mpd_client_seek_to_time (GstMpdClient * client, GDateTime * time)
{
GDateTime *start =
gst_date_time_to_g_date_time (client->mpd_node->availabilityStartTime);
GTimeSpan ts_microseconds;
GstClockTime ts;
gboolean ret = TRUE;
GList *stream;
g_return_val_if_fail (gst_mpd_client_is_live (client), 0);
ts_microseconds = g_date_time_difference (time, start);
g_date_time_unref (start);
ts = ts_microseconds * GST_USECOND;
for (stream = client->active_streams; stream; stream = g_list_next (stream)) {
ret = ret & gst_mpd_client_stream_seek (client, stream->data, ts);
}
return ret;
}
void
gst_media_fragment_info_clear (GstMediaFragmentInfo * fragment)
{

View file

@ -493,8 +493,10 @@ gboolean gst_mpd_client_get_next_header (GstMpdClient *client, gchar **uri, guin
gboolean gst_mpd_client_get_next_header_index (GstMpdClient *client, gchar **uri, guint stream_idx, gint64 * range_start, gint64 * range_end);
gboolean gst_mpd_client_is_live (GstMpdClient * client);
gboolean gst_mpd_client_stream_seek (GstMpdClient * client, GstActiveStream * stream, GstClockTime ts);
gboolean gst_mpd_client_seek_to_time (GstMpdClient * client, GDateTime * time);
GstDateTime *gst_mpd_client_add_time_difference (GstDateTime * t1, gint64 usecs);
gint gst_mpd_client_get_segment_index_at_time (GstMpdClient *client, GstActiveStream * stream, const GstDateTime *time);
gint gst_mpd_client_check_time_position (GstMpdClient * client, GstActiveStream * stream, GstClockTime ts, gint64 * diff);
/* Period selection */
gboolean gst_mpd_client_set_period_index (GstMpdClient *client, guint period_idx);