mssdemux: synchronize with the download loop thread to signal it to continue

If EOS or ERROR happens before the download loop thread has reached its
g_cond_wait() call, then the g_cond_signal doesn't have any effect and
the download loop thread stucks later.

https://bugzilla.gnome.org/show_bug.cgi?id=735663
This commit is contained in:
George Kiagiadakis 2014-09-12 02:36:47 -03:00 committed by Thiago Santos
parent f4546b64ea
commit e289ab07c1
2 changed files with 66 additions and 31 deletions

View file

@ -420,8 +420,13 @@ gst_mss_demux_handle_message (GstBin * bin, GstMessage * msg)
err->domain, err->code, err->message, debug); err->domain, err->code, err->message, debug);
/* error, but ask to retry */ /* error, but ask to retry */
g_mutex_lock (&stream->fragment_download_lock);
if (!stream->download_finished) {
stream->last_ret = GST_FLOW_CUSTOM_ERROR; stream->last_ret = GST_FLOW_CUSTOM_ERROR;
stream->download_finished = TRUE;
}
g_cond_signal (&stream->fragment_download_cond); g_cond_signal (&stream->fragment_download_cond);
g_mutex_unlock (&stream->fragment_download_lock);
g_error_free (err); g_error_free (err);
g_free (debug); g_free (debug);
@ -531,8 +536,12 @@ gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
gst_task_stop (stream->download_task); gst_task_stop (stream->download_task);
stream->cancelled = TRUE; stream->cancelled = TRUE;
if (immediate) if (immediate) {
g_mutex_lock (&stream->fragment_download_lock);
stream->download_finished = TRUE;
g_cond_signal (&stream->fragment_download_cond); g_cond_signal (&stream->fragment_download_cond);
g_mutex_unlock (&stream->fragment_download_lock);
}
} }
GST_OBJECT_UNLOCK (mssdemux); GST_OBJECT_UNLOCK (mssdemux);
@ -635,22 +644,26 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
case GST_EVENT_RECONFIGURE:{ case GST_EVENT_RECONFIGURE:{
GSList *iter; GSList *iter;
GST_OBJECT_LOCK (mssdemux);
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data; GstMssDemuxStream *stream = iter->data;
if (stream->pad == pad) { if (stream->pad == pad) {
GST_OBJECT_LOCK (mssdemux);
g_mutex_lock (&stream->fragment_download_lock);
if (stream->last_ret == GST_FLOW_NOT_LINKED) { if (stream->last_ret == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (stream->pad, "Received reconfigure"); GST_DEBUG_OBJECT (stream->pad, "Received reconfigure");
stream->restart_download = TRUE; stream->restart_download = TRUE;
stream->last_ret = GST_FLOW_OK;
gst_task_start (stream->download_task); gst_task_start (stream->download_task);
} }
g_mutex_unlock (&stream->fragment_download_lock);
GST_OBJECT_UNLOCK (mssdemux); GST_OBJECT_UNLOCK (mssdemux);
gst_event_unref (event); gst_event_unref (event);
return TRUE; return TRUE;
} }
} }
GST_OBJECT_UNLOCK (mssdemux);
} }
break; break;
case GST_EVENT_LATENCY:{ case GST_EVENT_LATENCY:{
@ -1165,10 +1178,14 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
/* TODO properly stop tasks */ /* TODO properly stop tasks */
/* gst_hls_demux_pause_tasks (demux); */ /* gst_hls_demux_pause_tasks (demux); */
g_mutex_lock (&stream->fragment_download_lock);
stream->download_finished = TRUE;
stream->last_ret = ret;
g_cond_signal (&stream->fragment_download_cond);
g_mutex_unlock (&stream->fragment_download_lock);
} }
/* avoid having the source handle the same error again */ /* avoid having the source handle the same error again */
stream->last_ret = ret;
ret = GST_FLOW_OK; ret = GST_FLOW_OK;
return ret; return ret;
@ -1182,7 +1199,10 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS: case GST_EVENT_EOS:
g_mutex_lock (&stream->fragment_download_lock);
stream->download_finished = TRUE;
g_cond_signal (&stream->fragment_download_cond); g_cond_signal (&stream->fragment_download_cond);
g_mutex_unlock (&stream->fragment_download_lock);
break; break;
default: default:
break; break;
@ -1241,7 +1261,6 @@ gst_mss_demux_stream_update_source (GstMssDemuxStream * stream,
if (!gst_uri_is_valid (uri)) { if (!gst_uri_is_valid (uri)) {
GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri); GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
stream->last_ret = GST_FLOW_ERROR;
return FALSE; return FALSE;
} }
@ -1340,16 +1359,21 @@ gst_mss_demux_stream_update_source (GstMssDemuxStream * stream,
return TRUE; return TRUE;
} }
/* must be called with the stream's fragment_download_lock */ static GstFlowReturn
static void
gst_mss_demux_stream_download_uri (GstMssDemux * demux, gst_mss_demux_stream_download_uri (GstMssDemux * demux,
GstMssDemuxStream * stream, const gchar * uri, gint64 start, gint64 end) GstMssDemuxStream * stream, const gchar * uri, gint64 start, gint64 end)
{ {
GstFlowReturn ret = GST_FLOW_OK;
GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT
" - %" G_GINT64_FORMAT, uri, start, end); " - %" G_GINT64_FORMAT, uri, start, end);
stream->download_finished = FALSE;
if (!gst_mss_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) { if (!gst_mss_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
return; g_mutex_lock (&stream->fragment_download_lock);
ret = stream->last_ret = GST_FLOW_ERROR;
g_mutex_unlock (&stream->fragment_download_lock);
return ret;
} }
if (gst_element_set_state (stream->src, if (gst_element_set_state (stream->src,
@ -1368,18 +1392,32 @@ gst_mss_demux_stream_download_uri (GstMssDemux * demux,
} }
} }
g_mutex_lock (&stream->fragment_download_lock);
if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) { if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
stream->download_start_time = g_get_monotonic_time (); stream->download_start_time = g_get_monotonic_time ();
g_mutex_unlock (&stream->fragment_download_lock);
gst_element_sync_state_with_parent (stream->src); gst_element_sync_state_with_parent (stream->src);
g_mutex_lock (&stream->fragment_download_lock);
/* wait for the fragment to be completely downloaded */ /* wait for the fragment to be completely downloaded */
GST_DEBUG_OBJECT (stream->pad, GST_DEBUG_OBJECT (stream->pad,
"Waiting for fragment download to finish: %s", uri); "Waiting for fragment download to finish: %s", uri);
while (!stream->cancelled && !stream->download_finished) {
g_cond_wait (&stream->fragment_download_cond, g_cond_wait (&stream->fragment_download_cond,
&stream->fragment_download_lock); &stream->fragment_download_lock);
} }
ret = stream->last_ret;
g_mutex_unlock (&stream->fragment_download_lock);
GST_DEBUG_OBJECT (stream->pad, "Fragment download finished: %s", uri);
}
g_mutex_unlock (&stream->fragment_download_lock);
} else { } else {
stream->last_ret = GST_FLOW_CUSTOM_ERROR; g_mutex_lock (&stream->fragment_download_lock);
ret = stream->last_ret = GST_FLOW_CUSTOM_ERROR;
g_mutex_unlock (&stream->fragment_download_lock);
} }
/* flush the proxypads so that the EOS state is reset */ /* flush the proxypads so that the EOS state is reset */
@ -1387,6 +1425,8 @@ gst_mss_demux_stream_download_uri (GstMssDemux * demux,
gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_stop (TRUE)); gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_stop (TRUE));
gst_element_set_state (stream->src, GST_STATE_READY); gst_element_set_state (stream->src, GST_STATE_READY);
gst_mss_demux_stream_clear_eos_state (stream); gst_mss_demux_stream_clear_eos_state (stream);
return ret;
} }
static GstFlowReturn static GstFlowReturn
@ -1397,14 +1437,6 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream)
gchar *url = NULL; gchar *url = NULL;
GstFlowReturn ret = GST_FLOW_OK; GstFlowReturn ret = GST_FLOW_OK;
/* special case for not-linked streams */
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (stream->pad, "Skipping download for not-linked stream %p",
stream);
return GST_FLOW_NOT_LINKED;
}
g_mutex_lock (&stream->fragment_download_lock);
GST_DEBUG_OBJECT (stream->pad, "Getting url for stream"); GST_DEBUG_OBJECT (stream->pad, "Getting url for stream");
ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path); ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
switch (ret) { switch (ret) {
@ -1414,13 +1446,10 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream)
g_free (path); g_free (path);
if (gst_mss_manifest_is_live (mssdemux->manifest)) { if (gst_mss_manifest_is_live (mssdemux->manifest)) {
gst_mss_demux_reload_manifest (mssdemux); gst_mss_demux_reload_manifest (mssdemux);
g_mutex_unlock (&stream->fragment_download_lock);
return GST_FLOW_OK; return GST_FLOW_OK;
} }
g_mutex_unlock (&stream->fragment_download_lock);
return GST_FLOW_EOS; return GST_FLOW_EOS;
case GST_FLOW_ERROR: case GST_FLOW_ERROR:
g_mutex_unlock (&stream->fragment_download_lock);
g_free (path); g_free (path);
goto error; goto error;
default: default:
@ -1435,26 +1464,28 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream)
GST_DEBUG_OBJECT (mssdemux, "Got url '%s' for stream %p", url, stream); GST_DEBUG_OBJECT (mssdemux, "Got url '%s' for stream %p", url, stream);
g_mutex_lock (&stream->fragment_download_lock);
stream->last_ret = GST_FLOW_OK;
stream->starting_fragment = TRUE; stream->starting_fragment = TRUE;
g_mutex_unlock (&stream->fragment_download_lock);
gst_mss_demux_stream_download_uri (mssdemux, stream, url, 0, -1); ret = gst_mss_demux_stream_download_uri (mssdemux, stream, url, 0, -1);
g_free (path); g_free (path);
g_free (url); g_free (url);
g_mutex_unlock (&stream->fragment_download_lock);
if (stream->last_ret != GST_FLOW_OK) { if (ret != GST_FLOW_OK) {
GST_INFO_OBJECT (mssdemux, "No fragment downloaded"); GST_INFO_OBJECT (mssdemux, "No fragment downloaded (%d %s)", ret,
gst_flow_get_name (ret));
/* TODO check if we are truly stoping */ /* TODO check if we are truly stoping */
if (stream->last_ret != GST_FLOW_ERROR if (ret != GST_FLOW_ERROR && gst_mss_manifest_is_live (mssdemux->manifest)) {
&& gst_mss_manifest_is_live (mssdemux->manifest)) {
/* looks like there is no way of knowing when a live stream has ended /* looks like there is no way of knowing when a live stream has ended
* Have to assume we are falling behind and cause a manifest reload */ * Have to assume we are falling behind and cause a manifest reload */
return GST_FLOW_OK; return GST_FLOW_OK;
} }
} }
return stream->last_ret; return ret;
no_url_error: no_url_error:
{ {
@ -1539,7 +1570,6 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
} }
stream->restart_download = FALSE; stream->restart_download = FALSE;
stream->last_ret = GST_FLOW_OK;
} }
capsevent = gst_mss_demux_reconfigure_stream (stream); capsevent = gst_mss_demux_reconfigure_stream (stream);
GST_OBJECT_UNLOCK (mssdemux); GST_OBJECT_UNLOCK (mssdemux);
@ -1637,13 +1667,17 @@ gst_mss_demux_combine_flows (GstMssDemux * mssdemux)
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data; GstMssDemuxStream *stream = iter->data;
g_mutex_lock (&stream->fragment_download_lock);
if (stream->last_ret != GST_FLOW_NOT_LINKED) if (stream->last_ret != GST_FLOW_NOT_LINKED)
all_notlinked = FALSE; all_notlinked = FALSE;
if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
|| stream->last_ret == GST_FLOW_FLUSHING) || stream->last_ret == GST_FLOW_FLUSHING) {
g_mutex_unlock (&stream->fragment_download_lock);
return stream->last_ret; return stream->last_ret;
} }
g_mutex_unlock (&stream->fragment_download_lock);
}
if (all_notlinked) if (all_notlinked)
return GST_FLOW_NOT_LINKED; return GST_FLOW_NOT_LINKED;
return GST_FLOW_OK; return GST_FLOW_OK;

View file

@ -85,6 +85,7 @@ struct _GstMssDemuxStream {
GstPad *src_srcpad; GstPad *src_srcpad;
GMutex fragment_download_lock; GMutex fragment_download_lock;
GCond fragment_download_cond; GCond fragment_download_cond;
gboolean download_finished;
gboolean starting_fragment; gboolean starting_fragment;
gint64 download_start_time; gint64 download_start_time;
gint64 download_total_time; gint64 download_total_time;