dashdemux: synchronize with the download loop thread to signal it to continue

If EOS or ERROR happens before the download loop thread has reached its
g_cond_wait() call, then the g_cond_signal doesn't have any effect and
the download loop thread stucks later.

https://bugzilla.gnome.org/show_bug.cgi?id=735663
This commit is contained in:
George Kiagiadakis 2014-08-29 12:38:12 +02:00 committed by Thiago Santos
parent 01ccac24fa
commit 55032ae5fe
2 changed files with 88 additions and 37 deletions

View file

@ -220,7 +220,7 @@ static GstFlowReturn gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream
static gboolean gst_dash_demux_advance_period (GstDashDemux * demux);
static void gst_dash_demux_download_wait (GstDashDemuxStream * stream,
GstClockTime time_diff);
static void gst_dash_demux_stream_download_uri (GstDashDemux * demux,
static GstFlowReturn gst_dash_demux_stream_download_uri (GstDashDemux * demux,
GstDashDemuxStream * stream, const gchar * uri, gint64 start, gint64 end);
static void gst_dash_demux_expose_streams (GstDashDemux * demux);
@ -236,8 +236,7 @@ static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux,
#define gst_dash_demux_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstDashDemux, gst_dash_demux, GST_TYPE_BIN,
GST_DEBUG_CATEGORY_INIT (gst_dash_demux_debug, "dashdemux", 0,
"dashdemux element");
);
"dashdemux element"););
static void
gst_dash_demux_dispose (GObject * obj)
@ -428,8 +427,13 @@ gst_dash_demux_handle_message (GstBin * bin, GstMessage * msg)
err->domain, err->code, err->message, debug);
/* error, but ask to retry */
stream->last_ret = GST_FLOW_CUSTOM_ERROR;
g_mutex_lock (&stream->fragment_download_lock);
if (!stream->download_finished) {
stream->last_ret = GST_FLOW_CUSTOM_ERROR;
stream->download_finished = TRUE;
}
g_cond_signal (&stream->fragment_download_cond);
g_mutex_unlock (&stream->fragment_download_lock);
g_error_free (err);
g_free (debug);
@ -601,12 +605,16 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
GstDashDemuxStream *stream = iter->data;
if (stream->pad == pad) {
g_mutex_lock (&stream->fragment_download_lock);
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
stream->last_ret = GST_FLOW_OK;
stream->restart_download = TRUE;
stream->need_header = TRUE;
g_mutex_unlock (&stream->fragment_download_lock);
GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
gst_task_start (stream->download_task);
} else {
g_mutex_unlock (&stream->fragment_download_lock);
}
GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
gst_event_unref (event);
@ -662,11 +670,15 @@ gst_dash_demux_all_streams_eop (GstDashDemux * demux)
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
g_mutex_lock (&stream->fragment_download_lock);
GST_LOG_OBJECT (stream->pad, "EOP: %d (linked: %d)", stream->stream_eos,
stream->last_ret != GST_FLOW_NOT_LINKED);
if (!stream->stream_eos && stream->last_ret != GST_FLOW_NOT_LINKED)
if (!stream->stream_eos && stream->last_ret != GST_FLOW_NOT_LINKED) {
g_mutex_unlock (&stream->fragment_download_lock);
return FALSE;
}
g_mutex_unlock (&stream->fragment_download_lock);
}
return TRUE;
@ -724,6 +736,8 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
g_cond_init (&stream->fragment_download_cond);
g_mutex_init (&stream->fragment_download_lock);
stream->download_finished = FALSE;
GST_LOG_OBJECT (demux, "Creating stream %d %" GST_PTR_FORMAT, i, caps);
streams = g_slist_prepend (streams, stream);
stream->pad = gst_dash_demux_create_pad (demux, stream);
@ -1076,12 +1090,15 @@ gst_dash_demux_stop (GstDashDemux * demux)
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
stream->last_ret = GST_FLOW_FLUSHING;
stream->need_header = TRUE;
gst_task_stop (stream->download_task);
GST_TASK_SIGNAL (stream->download_task);
gst_element_set_state (stream->src, GST_STATE_READY);
g_mutex_lock (&stream->fragment_download_lock);
stream->download_finished = TRUE;
stream->last_ret = GST_FLOW_FLUSHING;
g_cond_signal (&stream->fragment_download_cond);
g_mutex_unlock (&stream->fragment_download_lock);
}
}
@ -1131,7 +1148,7 @@ gst_dash_demux_remove_streams (GstDashDemux * demux, GSList * streams)
GstEvent *eos = gst_event_new_eos ();
for (iter = streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;;
GstDashDemuxStream *stream = iter->data;
GST_LOG_OBJECT (stream->pad, "Removing stream %d %" GST_PTR_FORMAT,
stream->index, stream->input_caps);
@ -1450,12 +1467,16 @@ gst_dash_demux_combine_flows (GstDashDemux * demux)
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
g_mutex_lock (&stream->fragment_download_lock);
if (stream->last_ret != GST_FLOW_NOT_LINKED)
all_notlinked = FALSE;
if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
|| stream->last_ret == GST_FLOW_FLUSHING)
|| stream->last_ret == GST_FLOW_FLUSHING) {
g_mutex_unlock (&stream->fragment_download_lock);
return stream->last_ret;
}
g_mutex_unlock (&stream->fragment_download_lock);
}
if (all_notlinked)
return GST_FLOW_NOT_LINKED;
@ -1524,7 +1545,6 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
goto cancelled;
}
stream->last_ret = flow_ret;
switch (flow_ret) {
case GST_FLOW_OK:
break;
@ -1627,13 +1647,13 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
break;
}
if (G_LIKELY (stream->last_ret != GST_FLOW_ERROR))
if (G_LIKELY (flow_ret != GST_FLOW_ERROR))
demux->client->update_failed_count = 0;
quit:
GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
if (G_UNLIKELY (stream->last_ret == GST_FLOW_EOS)) {
if (G_UNLIKELY (flow_ret == GST_FLOW_EOS)) {
gst_pad_push_event (stream->pad, gst_event_new_eos ());
}
@ -1753,12 +1773,13 @@ gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
return NULL;
}
static void
static GstFlowReturn
gst_dash_demux_download_header_fragment (GstDashDemux * demux,
GstDashDemuxStream * stream, gchar * path, gint64 range_start,
gint64 range_end)
{
gchar *next_header_uri;
GstFlowReturn ret;
if (strncmp (path, "http://", 7) != 0) {
next_header_uri =
@ -1769,37 +1790,40 @@ gst_dash_demux_download_header_fragment (GstDashDemux * demux,
next_header_uri = path;
}
gst_dash_demux_stream_download_uri (demux, stream, next_header_uri,
ret = gst_dash_demux_stream_download_uri (demux, stream, next_header_uri,
range_start, range_end);
g_free (next_header_uri);
return ret;
}
static void
static GstFlowReturn
gst_dash_demux_get_next_header (GstDashDemux * demux,
GstDashDemuxStream * stream)
{
gchar *initializationURL;
gint64 range_start, range_end;
GstFlowReturn ret;
if (!gst_mpd_client_get_next_header (demux->client, &initializationURL,
stream->index, &range_start, &range_end))
return;
return GST_FLOW_OK;
GST_INFO_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
G_GINT64_FORMAT, initializationURL, range_start, range_end);
gst_dash_demux_download_header_fragment (demux, stream,
ret = gst_dash_demux_download_header_fragment (demux, stream,
initializationURL, range_start, range_end);
/* check if we have an index */
if (!demux->cancelled && stream->last_ret == GST_FLOW_OK /* TODO check for other valid types */
if (!demux->cancelled && ret == GST_FLOW_OK /* TODO check for other valid types */
&& gst_mpd_client_get_next_header_index (demux->client,
&initializationURL, stream->index, &range_start, &range_end)) {
GST_INFO_OBJECT (demux,
"Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
initializationURL, range_start, range_end);
gst_dash_demux_download_header_fragment (demux, stream,
ret = gst_dash_demux_download_header_fragment (demux, stream,
initializationURL, range_start, range_end);
}
return ret;
}
static GstCaps *
@ -2001,11 +2025,13 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
/* TODO properly stop tasks */
/* gst_hls_demux_pause_tasks (demux); */
g_mutex_lock (&stream->fragment_download_lock);
stream->last_ret = ret;
stream->download_finished = TRUE;
g_cond_signal (&stream->fragment_download_cond);
g_mutex_unlock (&stream->fragment_download_lock);
}
stream->last_ret = ret;
return ret;
}
@ -2017,7 +2043,10 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
g_mutex_lock (&stream->fragment_download_lock);
stream->download_finished = TRUE;
g_cond_signal (&stream->fragment_download_cond);
g_mutex_unlock (&stream->fragment_download_lock);
break;
default:
break;
@ -2076,7 +2105,6 @@ gst_dash_demux_stream_update_source (GstDashDemuxStream * stream,
if (!gst_uri_is_valid (uri)) {
GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
stream->last_ret = GST_FLOW_ERROR;
return FALSE;
}
@ -2177,16 +2205,21 @@ gst_dash_demux_stream_update_source (GstDashDemuxStream * stream,
return TRUE;
}
/* must be called with the stream's fragment_download_lock */
static void
static GstFlowReturn
gst_dash_demux_stream_download_uri (GstDashDemux * demux,
GstDashDemuxStream * stream, const gchar * uri, gint64 start, gint64 end)
{
GstFlowReturn ret = GST_FLOW_OK;
GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT
" - %" G_GINT64_FORMAT, uri, start, end);
stream->download_finished = FALSE;
if (!gst_dash_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
return;
g_mutex_lock (&stream->fragment_download_lock);
ret = stream->last_ret = GST_FLOW_ERROR;
g_mutex_unlock (&stream->fragment_download_lock);
return ret;
}
if (gst_element_set_state (stream->src,
@ -2205,18 +2238,30 @@ gst_dash_demux_stream_download_uri (GstDashDemux * demux,
}
}
g_mutex_lock (&stream->fragment_download_lock);
if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
g_mutex_unlock (&stream->fragment_download_lock);
stream->download_start_time = g_get_monotonic_time ();
gst_element_sync_state_with_parent (stream->src);
g_mutex_lock (&stream->fragment_download_lock);
/* wait for the fragment to be completely downloaded */
GST_DEBUG_OBJECT (stream->pad,
"Waiting for fragment download to finish: %s", uri);
g_cond_wait (&stream->fragment_download_cond,
&stream->fragment_download_lock);
while (!demux->cancelled && !stream->download_finished) {
g_cond_wait (&stream->fragment_download_cond,
&stream->fragment_download_lock);
}
ret = stream->last_ret;
GST_DEBUG_OBJECT (stream->pad, "Fragment download finished: %s", uri);
}
g_mutex_unlock (&stream->fragment_download_lock);
} else {
stream->last_ret = GST_FLOW_CUSTOM_ERROR;
g_mutex_lock (&stream->fragment_download_lock);
ret = stream->last_ret = GST_FLOW_CUSTOM_ERROR;
g_mutex_unlock (&stream->fragment_download_lock);
}
/* flush the proxypads so that the EOS state is reset */
@ -2224,6 +2269,8 @@ gst_dash_demux_stream_download_uri (GstDashDemux * demux,
gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_stop (TRUE));
gst_element_set_state (stream->src, GST_STATE_READY);
gst_dash_demux_stream_clear_eos_state (stream);
return ret;
}
static void
@ -2233,6 +2280,7 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
GstActiveStream *active_stream;
guint stream_idx = stream->index;
GstMediaFragmentInfo *fragment = &stream->current_fragment;
GstFlowReturn ret = GST_FLOW_OK;
if (G_UNLIKELY (stream->restart_download)) {
GstClockTime cur, ts;
@ -2294,7 +2342,6 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
stream->restart_download = FALSE;
}
g_mutex_lock (&stream->fragment_download_lock);
if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, fragment,
stream->demux->segment.rate > 0.0)) {
GST_INFO_OBJECT (stream->pad,
@ -2305,16 +2352,18 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
fragment->range_start, fragment->range_end);
/* Reset last flow return */
g_mutex_lock (&stream->fragment_download_lock);
stream->last_ret = GST_FLOW_OK;
stream->starting_fragment = TRUE;
g_mutex_unlock (&stream->fragment_download_lock);
if (stream->need_header) {
/* We need to fetch a new header */
gst_dash_demux_get_next_header (demux, stream);
ret = gst_dash_demux_get_next_header (demux, stream);
stream->need_header = FALSE;
}
if (stream->last_ret != GST_FLOW_OK) {
if (ret != GST_FLOW_OK) {
GST_WARNING_OBJECT (stream->pad, "Failed to download headers");
goto exit;
}
@ -2334,11 +2383,11 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
"Fragment index download: %s %" G_GINT64_FORMAT "-%"
G_GINT64_FORMAT, uri, fragment->index_range_start,
fragment->index_range_end);
gst_dash_demux_stream_download_uri (demux, stream, uri,
ret = gst_dash_demux_stream_download_uri (demux, stream, uri,
fragment->index_range_start, fragment->index_range_end);
}
if (stream->last_ret != GST_FLOW_OK) {
if (ret != GST_FLOW_OK) {
GST_WARNING_OBJECT (stream->pad, "Failed to download fragment headers");
goto exit;
}
@ -2347,10 +2396,10 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
goto exit;
/* now get the real fragment */
gst_dash_demux_stream_download_uri (demux, stream, fragment->uri,
ret = gst_dash_demux_stream_download_uri (demux, stream, fragment->uri,
fragment->range_start, fragment->range_end);
if (stream->last_ret < GST_FLOW_EOS) {
if (ret < GST_FLOW_EOS) {
gst_media_fragment_info_clear (fragment);
goto exit;
}
@ -2358,11 +2407,14 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
active_stream = stream->active_stream;
if (active_stream == NULL) {
gst_media_fragment_info_clear (fragment);
g_mutex_lock (&stream->fragment_download_lock);
ret = GST_FLOW_ERROR;
stream->last_ret = GST_FLOW_ERROR;
g_mutex_unlock (&stream->fragment_download_lock);
goto exit;
}
if (stream->last_ret == GST_FLOW_OK) {
if (ret == GST_FLOW_OK) {
stream->position += fragment->duration;
}
@ -2374,8 +2426,6 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
exit:
g_mutex_unlock (&stream->fragment_download_lock);
if (stream->last_ret == GST_FLOW_OK)
gst_element_set_state (stream->src, GST_STATE_READY);
else

View file

@ -86,6 +86,7 @@ struct _GstDashDemuxStream
GstPad *src_srcpad;
GMutex fragment_download_lock;
GCond fragment_download_cond;
gboolean download_finished;
GstMediaFragmentInfo current_fragment;
gboolean starting_fragment;
gint64 download_start_time;