dashdemux: handle multiple languages

Handle multiple languages by using the not-linked return to stop
the download task for that stream. It can be reactivated when
a reconfigure event is received. Stopping the unused streams is
relevant to save network bandwidth
This commit is contained in:
Thiago Santos 2013-12-04 11:30:22 -03:00
parent 31e4ba0094
commit 6611d14eed
2 changed files with 248 additions and 94 deletions

View file

@ -181,8 +181,11 @@ enum
#define DEFAULT_FAILED_COUNT 3
#define DOWNLOAD_RATE_HISTORY_MAX 3
#define GST_DASH_DEMUX_DOWNLOAD_LOCK(d) g_mutex_lock (&d->download_mutex)
#define GST_DASH_DEMUX_DOWNLOAD_UNLOCK(d) g_mutex_unlock (&d->download_mutex)
#define GST_DASH_DEMUX_CLIENT_LOCK(d) g_mutex_lock (&d->client_lock)
#define GST_DASH_DEMUX_CLIENT_UNLOCK(d) g_mutex_unlock (&d->client_lock)
#define GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK(s) g_mutex_lock (&s->download_mutex)
#define GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK(s) g_mutex_unlock (&s->download_mutex)
/* Custom internal event to signal end of period */
#define GST_EVENT_DASH_EOP GST_EVENT_MAKE_TYPE(81, GST_EVENT_TYPE_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED)
@ -225,7 +228,7 @@ static GstFlowReturn gst_dash_demux_get_next_fragment (GstDashDemux * demux,
GstDashDemuxStream * stream, GstActiveStream ** active_stream,
GstClockTime * next_ts);
static gboolean gst_dash_demux_advance_period (GstDashDemux * demux);
static void gst_dash_demux_download_wait (GstDashDemux * demux,
static void gst_dash_demux_download_wait (GstDashDemuxStream * stream,
GstClockTime time_diff);
static void gst_dash_demux_expose_streams (GstDashDemux * demux);
@ -261,15 +264,13 @@ gst_dash_demux_dispose (GObject * obj)
demux->stream_task = NULL;
}
g_cond_clear (&demux->download_cond);
g_mutex_clear (&demux->download_mutex);
if (demux->downloader != NULL) {
g_object_unref (demux->downloader);
demux->downloader = NULL;
}
g_mutex_clear (&demux->streams_lock);
g_mutex_clear (&demux->client_lock);
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
@ -345,10 +346,6 @@ gst_dash_demux_init (GstDashDemux * demux)
demux->bandwidth_usage = DEFAULT_BANDWIDTH_USAGE;
demux->max_bitrate = DEFAULT_MAX_BITRATE;
/* Updates task */
g_cond_init (&demux->download_cond);
g_mutex_init (&demux->download_mutex);
/* Streaming task */
g_rec_mutex_init (&demux->stream_task_lock);
demux->stream_task =
@ -356,6 +353,7 @@ gst_dash_demux_init (GstDashDemux * demux)
gst_task_set_lock (demux->stream_task, &demux->stream_task_lock);
g_mutex_init (&demux->streams_lock);
g_mutex_init (&demux->client_lock);
}
static void
@ -470,6 +468,38 @@ gst_dash_demux_stream_push_data (GstDashDemuxStream * stream,
gst_data_queue_push (stream->queue, item);
}
static void
gst_dash_demux_stream_seek (GstDashDemuxStream * stream,
GstClockTime target_pos)
{
gint seg_i;
guint current_sequence = 0;
GstActiveStream *active_stream;
GstMediaSegment *chunk;
GstClockTime current_pos = 0;
GstDashDemux *demux = stream->demux;
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client, stream->index);
for (seg_i = 0; seg_i < active_stream->segments->len; seg_i++) {
chunk = g_ptr_array_index (active_stream->segments, seg_i);
current_pos = chunk->start_time;
/* current_sequence = chunk->number; */
GST_DEBUG_OBJECT (demux, "current_pos:%" GST_TIME_FORMAT
" <= target_pos:%" GST_TIME_FORMAT " duration:%"
GST_TIME_FORMAT, GST_TIME_ARGS (current_pos),
GST_TIME_ARGS (target_pos), GST_TIME_ARGS (chunk->duration));
if (current_pos <= target_pos && target_pos < current_pos + chunk->duration) {
GST_DEBUG_OBJECT (demux,
"selecting sequence %d for stream %" GST_PTR_FORMAT,
current_sequence, stream);
break;
}
current_sequence++;
}
gst_mpd_client_set_segment_index (active_stream, current_sequence);
}
static gboolean
gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
@ -487,9 +517,7 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
gint64 start, stop;
GList *list;
GstClockTime current_pos, target_pos;
guint current_sequence, current_period;
GstActiveStream *active_stream;
GstMediaSegment *chunk;
guint current_period;
GstStreamPeriod *period;
GSList *iter;
gboolean update;
@ -575,32 +603,7 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
/* Update the current sequence on all streams */
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
gint seg_i;
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client,
stream->index);
current_pos = 0;
current_sequence = 0;
for (seg_i = 0; seg_i < active_stream->segments->len; seg_i++) {
chunk = g_ptr_array_index (active_stream->segments, seg_i);
current_pos = chunk->start_time;
/* current_sequence = chunk->number; */
GST_DEBUG_OBJECT (demux, "current_pos:%" GST_TIME_FORMAT
" <= target_pos:%" GST_TIME_FORMAT " duration:%"
GST_TIME_FORMAT, GST_TIME_ARGS (current_pos),
GST_TIME_ARGS (target_pos), GST_TIME_ARGS (chunk->duration));
if (current_pos <= target_pos
&& target_pos < current_pos + chunk->duration) {
GST_DEBUG_OBJECT (demux,
"selecting sequence %d for stream %" GST_PTR_FORMAT,
current_sequence, stream);
break;
}
current_sequence++;
}
gst_mpd_client_set_segment_index (active_stream, current_sequence);
gst_dash_demux_stream_seek (iter->data, target_pos);
}
if (flags & GST_SEEK_FLAG_FLUSH) {
@ -624,6 +627,7 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
gst_data_queue_set_flushing (stream->queue, FALSE);
stream->last_ret = GST_FLOW_OK;
}
demux->timestamp_offset = 0;
demux->need_segment = TRUE;
@ -636,6 +640,28 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
return TRUE;
}
case GST_EVENT_RECONFIGURE:{
GSList *iter;
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
if (stream->pad == pad) {
GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (stream);
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
stream->last_ret = GST_FLOW_OK;
stream->restart_download = TRUE;
stream->need_header = TRUE;
GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
}
gst_task_start (stream->download_task);
GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream);
gst_event_unref (event);
return TRUE;
}
}
}
break;
default:
break;
}
@ -703,6 +729,8 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
gst_task_new ((GstTaskFunction) gst_dash_demux_stream_download_loop,
stream, NULL);
gst_task_set_lock (stream->download_task, &stream->download_task_lock);
g_cond_init (&stream->download_cond);
g_mutex_init (&stream->download_mutex);
stream->index = i;
stream->input_caps = caps;
@ -1018,13 +1046,10 @@ gst_dash_demux_stop (GstDashDemux * demux)
gst_data_queue_set_flushing (stream->queue, TRUE);
if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (stream->download_task);
stream->last_ret = GST_FLOW_FLUSHING;
stream->need_header = TRUE;
gst_task_stop (stream->download_task);
g_mutex_lock (&demux->download_mutex);
g_cond_signal (&demux->download_cond);
g_mutex_unlock (&demux->download_mutex);
g_rec_mutex_lock (&stream->download_task_lock);
g_rec_mutex_unlock (&stream->download_task_lock);
GST_TASK_SIGNAL (stream->download_task);
gst_task_join (stream->download_task);
}
}
@ -1154,7 +1179,6 @@ static void
gst_dash_demux_stream_loop (GstDashDemux * demux)
{
GstFlowReturn ret;
GstActiveStream *active_stream;
GSList *iter;
GstClockTime best_time;
GstDashDemuxStream *selected_stream;
@ -1172,6 +1196,9 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
GST_DEBUG_OBJECT (demux, "Peeking stream %d", stream->index);
if (stream->last_ret == GST_FLOW_NOT_LINKED)
continue;
if (stream->stream_eos) {
GST_DEBUG_OBJECT (demux, "Stream %d is eos, skipping", stream->index);
continue;
@ -1221,6 +1248,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
}
}
ret = GST_FLOW_OK;
if (selected_stream) {
GstDataQueueItem *item;
@ -1232,13 +1260,9 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
if (G_LIKELY (GST_IS_BUFFER (item->object))) {
GstBuffer *buffer;
GstClockTime timestamp;
GstClockTime timestamp, duration;
buffer = GST_BUFFER_CAST (item->object);
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client,
selected_stream->index);
timestamp = GST_BUFFER_TIMESTAMP (buffer);
if (demux->need_segment) {
@ -1255,32 +1279,36 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
}
/* make timestamp start from 0 by subtracting the offset */
timestamp -= demux->timestamp_offset;
duration = GST_BUFFER_DURATION (buffer);
GST_BUFFER_TIMESTAMP (buffer) = timestamp;
GST_DEBUG_OBJECT (demux,
"Pushing fragment ts: %" GST_TIME_FORMAT " at pad %s",
GST_TIME_ARGS (timestamp), GST_PAD_NAME (selected_stream->pad));
#if 0
GST_DEBUG_OBJECT (demux,
"Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%"
GST_TIME_FORMAT " at pad: %s:%s", buffer, GST_BUFFER_OFFSET (buffer),
selected_stream->index, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
"Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%"
GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT " at pad: %s:%s", buffer,
GST_BUFFER_OFFSET (buffer), selected_stream->index,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
GST_DEBUG_PAD_NAME (selected_stream->pad));
#endif
ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer));
GST_DEBUG_OBJECT (demux, "Push result: %d %s", ret,
gst_flow_get_name (ret));
demux->segment.position = timestamp;
selected_stream->position = timestamp;
if (GST_CLOCK_TIME_IS_VALID (duration))
selected_stream->position += duration;
item->destroy (item);
if ((ret != GST_FLOW_OK) && (active_stream
&& active_stream->mimeType == GST_STREAM_VIDEO))
goto error_pushing;
} else {
/* a GstEvent */
if (GST_EVENT_TYPE (item->object) == GST_EVENT_EOS) {
selected_stream->stream_end_of_period = TRUE;
selected_stream->stream_eos = TRUE;
ret = GST_FLOW_EOS;
} else if (GST_EVENT_TYPE (item->object) == GST_EVENT_DASH_EOP) {
selected_stream->stream_end_of_period = TRUE;
}
@ -1298,6 +1326,19 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
gst_dash_demux_advance_period (demux);
}
}
GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (selected_stream);
if (ret != selected_stream->last_ret) {
gst_task_start (selected_stream->download_task);
selected_stream->last_ret = ret;
}
switch (selected_stream->last_ret) {
case GST_FLOW_NOT_LINKED:
gst_data_queue_set_flushing (selected_stream->queue, TRUE);
break;
default:
break;
}
GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (selected_stream);
end:
GST_INFO_OBJECT (demux, "Leaving streaming task");
@ -1317,6 +1358,7 @@ end_of_manifest:
return;
}
#if 0
error_pushing:
{
/* FIXME: handle error */
@ -1326,6 +1368,7 @@ error_pushing:
gst_task_stop (demux->stream_task);
return;
}
#endif
}
static void
@ -1350,6 +1393,8 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream)
gst_object_unref (stream->download_task);
g_rec_mutex_clear (&stream->download_task_lock);
}
g_cond_clear (&stream->download_cond);
g_mutex_clear (&stream->download_mutex);
g_free (stream);
}
@ -1630,26 +1675,43 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
GstDashDemux *demux = stream->demux;
GstFlowReturn flow_ret = GST_FLOW_OK;
GST_LOG_OBJECT (demux, "Starting download loop %p %s:%s", stream,
GST_DEBUG_PAD_NAME (stream->pad));
GST_LOG_OBJECT (stream->pad, "Starting download loop");
GST_DASH_DEMUX_DOWNLOAD_LOCK (demux);
GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (stream);
if (stream->last_ret < GST_FLOW_OK) {
if (demux->cancelled) {
GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream);
goto cancelled;
}
GST_DEBUG_OBJECT (stream->pad, "Download loop waiting due to flow return: "
"%d %s", stream->last_ret, gst_flow_get_name (stream->last_ret));
gst_task_pause (stream->download_task);
GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream);
return;
}
GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream);
if (demux->cancelled) {
goto cancelled;
}
GST_DASH_DEMUX_CLIENT_LOCK (demux);
if (gst_mpd_client_is_live (demux->client)
&& demux->client->mpd_uri != NULL) {
switch (gst_dash_demux_refresh_mpd (demux)) {
case GST_FLOW_EOS:
GST_DASH_DEMUX_DOWNLOAD_UNLOCK (demux);
GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
goto end_of_manifest;
default:
break;
}
}
GST_DEBUG_OBJECT (demux, "download loop %i", demux->end_of_manifest);
GST_DEBUG_OBJECT (stream->pad, "End of manifest: %d", demux->end_of_manifest);
/* try to switch to another set of representations if needed */
gst_dash_demux_stream_select_representation_unlocked (stream);
GST_DASH_DEMUX_DOWNLOAD_UNLOCK (demux);
GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
/* fetch the next fragment */
flow_ret = gst_dash_demux_get_next_fragment (demux, stream, &fragment_stream,
@ -1659,13 +1721,14 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
case GST_FLOW_OK:
break;
case GST_FLOW_EOS:
GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (stream);
if (demux->end_of_period) {
GST_INFO_OBJECT (demux, "Reached the end of the Period");
GST_INFO_OBJECT (stream->pad, "Reached the end of the Period");
/* setup video, audio and subtitle streams, starting from the next Period */
if (!gst_mpd_client_set_period_index (demux->client,
gst_mpd_client_get_period_index (demux->client) + 1)
|| !gst_dash_demux_setup_all_streams (demux)) {
GST_INFO_OBJECT (demux, "Reached the end of the manifest file");
GST_INFO_OBJECT (stream->pad, "Reached the end of the manifest file");
demux->end_of_manifest = TRUE;
gst_task_start (demux->stream_task);
goto end_of_manifest;
@ -1674,6 +1737,8 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0);
demux->end_of_period = FALSE;
}
gst_task_pause (stream->download_task);
GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream);
break;
case GST_FLOW_ERROR:
/* Download failed 'by itself'
@ -1687,7 +1752,7 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
pos =
gst_mpd_client_check_time_position (demux->client, fragment_stream,
fragment_ts, &time_diff);
GST_DEBUG_OBJECT (demux,
GST_DEBUG_OBJECT (stream->pad,
"Checked position for fragment ts %" GST_TIME_FORMAT
", res: %d, diff: %" G_GINT64_FORMAT, GST_TIME_ARGS (fragment_ts),
pos, time_diff);
@ -1697,7 +1762,7 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
/* we're behind, try moving to the 'present' */
GDateTime *now = g_date_time_new_now_utc ();
GST_DEBUG_OBJECT (demux,
GST_DEBUG_OBJECT (stream->pad,
"Falling behind live stream, moving forward");
gst_mpd_client_seek_to_time (demux->client, now);
g_date_time_unref (now);
@ -1705,10 +1770,11 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
} else if (pos > 0) {
/* we're ahead, wait a little */
GST_DEBUG_OBJECT (demux, "Waiting for next segment to be created");
GST_DEBUG_OBJECT (stream->pad,
"Waiting for next segment to be created");
gst_mpd_client_set_segment_index (fragment_stream,
fragment_stream->segment_idx - 1);
gst_dash_demux_download_wait (demux, time_diff);
gst_dash_demux_download_wait (stream, time_diff);
} else {
gst_mpd_client_set_segment_index (fragment_stream,
fragment_stream->segment_idx - 1);
@ -1719,7 +1785,7 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
}
if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
GST_WARNING_OBJECT (demux, "Could not fetch the next fragment");
GST_WARNING_OBJECT (stream->pad, "Could not fetch the next fragment");
goto quit;
} else {
goto error_downloading;
@ -1733,24 +1799,23 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
goto cancelled;
}
GST_INFO_OBJECT (demux, "Internal buffering : %" G_GUINT64_FORMAT " s",
GST_INFO_OBJECT (stream->pad, "Internal buffering : %" G_GUINT64_FORMAT " s",
gst_dash_demux_get_buffering_time (demux) / GST_SECOND);
demux->client->update_failed_count = 0;
quit:
GST_DEBUG_OBJECT (demux, "Finishing download loop");
GST_DEBUG_OBJECT (stream->pad, "Finishing download loop");
return;
cancelled:
{
GST_WARNING_OBJECT (demux, "Cancelled, leaving download task");
gst_task_stop (stream->download_task);
GST_WARNING_OBJECT (stream->pad, "Cancelled, leaving download task");
return;
}
end_of_manifest:
{
GST_INFO_OBJECT (demux, "End of manifest, leaving download task");
GST_INFO_OBJECT (stream->pad, "End of manifest, leaving download task");
gst_task_stop (stream->download_task);
return;
}
@ -2006,7 +2071,7 @@ gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream)
static void
gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux,
GstActiveStream * stream)
GstDashDemuxStream * dash_stream, GstActiveStream * stream)
{
GstDateTime *seg_end_time;
GstDateTime *cur_time = gst_date_time_new_now_utc ();
@ -2027,26 +2092,82 @@ gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux,
GST_DEBUG_OBJECT (demux,
"Selected fragment has end timestamp > now (%" PRIi64
"), delaying download", diff);
gst_dash_demux_download_wait (demux, diff);
gst_dash_demux_download_wait (dash_stream, diff);
}
}
}
static gboolean
gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
GstDashDemuxStream * demux_stream, guint64 * size_buffer,
GstDashDemuxStream * stream, guint64 * size_buffer,
GstClockTime * download_time)
{
GstActiveStream *active_stream;
GstFragment *download;
GTimeVal now;
GTimeVal start;
guint stream_idx = demux_stream->index;
guint stream_idx = stream->index;
GstBuffer *buffer;
GstBuffer *header_buffer;
GstMediaFragmentInfo fragment;
if (G_UNLIKELY (stream->restart_download)) {
GstClockTime cur, ts;
gint64 pos;
GstEvent *gap;
GST_DEBUG_OBJECT (stream->pad,
"Reactivating stream after to reconfigure event");
cur = GST_CLOCK_TIME_IS_VALID (stream->position) ? stream->position : 0;
if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
ts = (GstClockTime) pos;
GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
GST_TIME_FORMAT, GST_TIME_ARGS (ts));
} else {
ts = demux->segment.position;
GST_DEBUG_OBJECT (stream->pad, "Downstream position query failed, "
"failling back to looking at other pads");
}
GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
"position %" GST_TIME_FORMAT ", current catch up %" GST_TIME_FORMAT,
GST_TIME_ARGS (ts), GST_TIME_ARGS (demux->segment.position));
if (GST_CLOCK_TIME_IS_VALID (ts)) {
gst_dash_demux_stream_seek (stream, ts);
if (cur < ts) {
gap = gst_event_new_gap (cur, ts - cur);
gst_pad_push_event (stream->pad, gap);
}
}
/* This stream might be entering into catching up mode,
* meaning that it will push buffers from this same download thread
* until it reaches 'catch_up_timestamp'.
*
* The reason for this is that in case of stream switching, the other
* stream that was previously active might be blocking the stream_loop
* in case it is ahead enough that all queues are filled.
* In this case, it is possible that a downstream input-selector is
* blocking waiting for the currently active stream to reach the
* same position of the old linked stream because of the 'sync-streams'
* behavior.
*
* We can push from this thread up to 'catch_up_timestamp' as all other
* streams should be around the same timestamp.
*/
stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
gst_data_queue_set_flushing (stream->queue, FALSE);
stream->restart_download = FALSE;
gst_task_start (demux->stream_task);
}
if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, &fragment)) {
gboolean catch_up = FALSE;
g_get_current_time (&start);
GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx);
@ -2100,13 +2221,13 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
}
}
if (demux_stream->need_header) {
if (stream->need_header) {
/* We need to fetch a new header */
if ((header_buffer =
gst_dash_demux_get_next_header (demux, stream_idx)) != NULL) {
buffer = gst_buffer_append (header_buffer, buffer);
}
demux_stream->need_header = FALSE;
stream->need_header = FALSE;
}
g_get_current_time (&now);
*download_time = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (start));
@ -2120,12 +2241,36 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
gst_media_fragment_info_clear (&fragment);
gst_dash_demux_stream_push_data (demux_stream, buffer);
demux_stream->has_data_queued = TRUE;
/* Check if this stream is on catch up mode */
if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) {
GST_DEBUG_OBJECT (stream->pad,
"Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT,
GST_TIME_ARGS (demux->segment.position),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
if (GST_BUFFER_TIMESTAMP (buffer) < demux->segment.position) {
catch_up = TRUE;
} else {
stream->last_ret = GST_FLOW_OK;
gst_task_start (demux->stream_task);
}
}
if (catch_up) {
GstFlowReturn ret;
ret = gst_pad_push (stream->pad, buffer);
if (G_LIKELY (ret == GST_FLOW_OK))
stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
/* TODO handle return */
} else {
gst_dash_demux_stream_push_data (stream, buffer);
stream->has_data_queued = TRUE;
}
*size_buffer += gst_buffer_get_size (buffer);
} else {
GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d",
demux_stream, demux_stream->index);
stream, stream->index);
}
return TRUE;
}
@ -2196,7 +2341,8 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux,
if (stream && gst_mpd_client_is_live (demux->client) &&
demux->client->mpd_node->minimumUpdatePeriod != -1) {
gst_dash_demux_wait_for_fragment_to_be_available (demux, *active_stream);
gst_dash_demux_wait_for_fragment_to_be_available (demux, stream,
*active_stream);
}
/* Get the fragment corresponding to each stream index */
@ -2229,12 +2375,13 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux,
}
static void
gst_dash_demux_download_wait (GstDashDemux * demux, GstClockTime time_diff)
gst_dash_demux_download_wait (GstDashDemuxStream * stream,
GstClockTime time_diff)
{
gint64 end_time = g_get_monotonic_time () + time_diff / GST_USECOND;
GST_DEBUG_OBJECT (demux, "Download waiting for %" GST_TIME_FORMAT,
GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
GST_TIME_ARGS (time_diff));
g_cond_wait_until (&demux->download_cond, &demux->download_mutex, end_time);
GST_DEBUG_OBJECT (demux, "Download finished waiting");
g_cond_wait_until (&stream->download_cond, &stream->download_mutex, end_time);
GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
}

View file

@ -63,6 +63,10 @@ struct _GstDashDemuxStream
GstCaps *input_caps;
GstFlowReturn last_ret;
GstClockTime position;
gboolean restart_download;
/*
* Need to store the status for the download and
* stream tasks separately as they are working at
@ -94,6 +98,10 @@ struct _GstDashDemuxStream
gboolean has_data_queued;
GstDataQueue *queue;
/* Download task */
GMutex download_mutex;
GCond download_cond;
GstTask *download_task;
GRecMutex download_task_lock;
@ -124,6 +132,8 @@ struct _GstDashDemux
GstBuffer *manifest;
GstUriDownloader *downloader;
GstMpdClient *client; /* MPD client */
GMutex client_lock;
gboolean end_of_period;
gboolean end_of_manifest;
@ -136,9 +146,6 @@ struct _GstDashDemux
GstTask *stream_task;
GRecMutex stream_task_lock;
/* Download task */
GMutex download_mutex;
GCond download_cond;
gboolean cancelled;
/* Manifest update */