adaptivedemux: Add more safeguards around state changes.

Make state changes of internal elements more reliable by locking
their state, and ensuring that they aren't blocked pushing data
downstream before trying to set their state.

Add a boolean to avoid starting tasks when the main
thread is busy trying to shut the element down.
This commit is contained in:
Jan Schmidt 2016-07-13 23:02:10 +10:00
parent 5fc21d5c74
commit 585e60c4ab
2 changed files with 57 additions and 16 deletions

View file

@ -531,9 +531,15 @@ gst_adaptive_demux_change_state (GstElement * element,
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
GST_MANIFEST_LOCK (demux);
demux->running = FALSE;
gst_adaptive_demux_reset (demux);
GST_MANIFEST_UNLOCK (demux);
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
GST_MANIFEST_LOCK (demux);
demux->running = TRUE;
GST_MANIFEST_UNLOCK (demux);
break;
default:
break;
}
@ -944,20 +950,10 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
demux->streams = demux->next_streams;
demux->next_streams = NULL;
/* First ensure all on-going downloads are finished or cancelled */
GST_MANIFEST_UNLOCK (demux);
for (iter = old_streams; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
g_mutex_lock (&stream->fragment_download_lock);
while (!stream->cancelled && !stream->download_finished) {
GST_DEBUG_OBJECT (stream->pad,
"Waiting for download on active stream to finish");
g_cond_wait (&stream->fragment_download_cond,
&stream->fragment_download_lock);
}
g_mutex_unlock (&stream->fragment_download_lock);
if (!demux->running) {
GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
return TRUE;
}
GST_MANIFEST_LOCK (demux);
for (iter = demux->streams; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
@ -1237,6 +1233,7 @@ gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
if (stream->src) {
GST_MANIFEST_UNLOCK (demux);
gst_element_set_locked_state (stream->src, TRUE);
gst_element_set_state (stream->src, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), stream->src);
stream->src = NULL;
@ -1528,7 +1525,8 @@ gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
if (stream) {
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
if (!stream->cancelled && demux->running &&
stream->last_ret == GST_FLOW_NOT_LINKED) {
stream->last_ret = GST_FLOW_OK;
stream->restart_download = TRUE;
stream->need_header = TRUE;
@ -1675,6 +1673,11 @@ gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
{
GList *iter;
if (!demux->running) {
GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
return;
}
GST_INFO_OBJECT (demux, "Starting streams' tasks");
for (iter = demux->streams; iter; iter = g_list_next (iter)) {
GstAdaptiveDemuxStream *stream = iter->data;
@ -1736,6 +1739,7 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux)
GST_MANIFEST_UNLOCK (demux);
if (src) {
gst_element_set_locked_state (stream->src, TRUE);
gst_element_set_state (src, GST_STATE_READY);
}
@ -2387,6 +2391,7 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
if (!g_str_equal (old_protocol, new_protocol)) {
gst_object_unref (stream->src_srcpad);
gst_element_set_locked_state (stream->src, TRUE);
gst_element_set_state (stream->src, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), stream->src);
stream->src = NULL;
@ -2402,6 +2407,7 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
err->message);
g_clear_error (&err);
gst_object_unref (stream->src_srcpad);
gst_element_set_locked_state (stream->src, TRUE);
gst_element_set_state (stream->src, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), stream->src);
stream->src = NULL;
@ -2542,6 +2548,22 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
return TRUE;
}
static GstPadProbeReturn
gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
gpointer user_data)
{
GstAdaptiveDemuxStream *stream = user_data;
/* The source's src pad is IDLE so now set the state to READY */
g_mutex_lock (&stream->fragment_download_lock);
stream->src_at_ready = TRUE;
g_cond_signal (&stream->fragment_download_cond);
g_mutex_unlock (&stream->fragment_download_lock);
return GST_PAD_PROBE_OK;
}
/* must be called with manifest_lock taken.
* Can temporarily release manifest_lock
*/
@ -2559,6 +2581,8 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
return ret;
}
gst_element_set_locked_state (stream->src, TRUE);
if (gst_element_set_state (stream->src,
GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
if (start != 0 || end != -1) {
@ -2608,6 +2632,7 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
"Waiting for fragment download to finish: %s", uri);
g_mutex_lock (&stream->fragment_download_lock);
stream->src_at_ready = FALSE;
if (G_UNLIKELY (stream->cancelled)) {
g_mutex_unlock (&stream->fragment_download_lock);
GST_MANIFEST_LOCK (demux);
@ -2645,10 +2670,22 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
*/
GST_MANIFEST_UNLOCK (demux);
/* FIXME: Wait until the src pad is IDLE, as it might be blocked
* downstream indefinitely here */
stream->src_at_ready = FALSE;
gst_element_set_locked_state (stream->src, TRUE);
gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
gst_ad_stream_src_to_ready_cb, stream, NULL);
g_mutex_lock (&stream->fragment_download_lock);
while (!stream->src_at_ready) {
g_cond_wait (&stream->fragment_download_cond,
&stream->fragment_download_lock);
}
g_mutex_unlock (&stream->fragment_download_lock);
gst_element_set_state (stream->src, GST_STATE_READY);
/* Need to drop the fragment_download_lock to get the MANIFEST lock */
GST_MANIFEST_LOCK (demux);
g_mutex_lock (&stream->fragment_download_lock);
if (G_UNLIKELY (stream->cancelled)) {
@ -3135,6 +3172,7 @@ download_error:
gst_task_stop (stream->download_task);
if (stream->src) {
gst_element_set_locked_state (stream->src, TRUE);
gst_element_set_state (stream->src, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), stream->src);
stream->src = NULL;

View file

@ -152,6 +152,7 @@ struct _GstAdaptiveDemuxStream
GCond fragment_download_cond;
gboolean download_finished; /* protected by fragment_download_lock */
gboolean cancelled; /* protected by fragment_download_lock */
gboolean src_at_ready; /* protected by fragment_download_lock */
gboolean starting_fragment;
gboolean first_fragment_buffer;
gint64 download_start_time;
@ -183,6 +184,8 @@ struct _GstAdaptiveDemux
/*< private >*/
GstBin bin;
gboolean running;
gsize stream_struct_size;
/*< protected >*/