hlsdemux: synchronize with the download loop thread to signal it to continue

If EOS or ERROR happens before the download loop thread has reached its
g_cond_wait() call, then the g_cond_signal doesn't have any effect and
the download loop thread stucks later.

https://bugzilla.gnome.org/show_bug.cgi?id=735663
This commit is contained in:
George Kiagiadakis 2014-09-12 02:35:44 -03:00 committed by Thiago Santos
parent 55032ae5fe
commit f4546b64ea
2 changed files with 60 additions and 12 deletions

View file

@ -377,6 +377,7 @@ gst_hls_demux_handle_message (GstBin * bin, GstMessage * msg)
demux->last_ret = GST_FLOW_CUSTOM_ERROR;
g_clear_error (&demux->last_error);
demux->last_error = g_error_copy (err);
demux->download_finished = TRUE;
g_cond_signal (&demux->fragment_download_cond);
g_mutex_unlock (&demux->fragment_download_lock);
@ -762,6 +763,7 @@ gst_hls_demux_pause_tasks (GstHLSDemux * demux)
g_cond_signal (&demux->download_cond);
g_mutex_unlock (&demux->download_lock);
g_mutex_lock (&demux->fragment_download_lock);
demux->download_finished = TRUE;
g_cond_signal (&demux->fragment_download_cond);
g_mutex_unlock (&demux->fragment_download_lock);
gst_task_pause (demux->stream_task);
@ -788,6 +790,7 @@ gst_hls_demux_stop (GstHLSDemux * demux)
g_cond_signal (&demux->download_cond);
g_mutex_unlock (&demux->download_lock);
g_mutex_lock (&demux->fragment_download_lock);
demux->download_finished = TRUE;
g_cond_signal (&demux->fragment_download_cond);
g_mutex_unlock (&demux->fragment_download_lock);
gst_task_stop (demux->stream_task);
@ -868,7 +871,12 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
("decryption failed %s", err->message));
g_error_free (err);
g_mutex_lock (&demux->fragment_download_lock);
demux->last_ret = GST_FLOW_ERROR;
demux->download_finished = TRUE;
g_cond_signal (&demux->fragment_download_cond);
g_mutex_unlock (&demux->fragment_download_lock);
return GST_FLOW_ERROR;
}
@ -906,7 +914,12 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND,
("Could not determine type of stream"), (NULL));
gst_buffer_unref (buffer);
g_mutex_lock (&demux->fragment_download_lock);
demux->last_ret = GST_FLOW_NOT_NEGOTIATED;
demux->download_finished = TRUE;
g_cond_signal (&demux->fragment_download_cond);
g_mutex_unlock (&demux->fragment_download_lock);
return GST_FLOW_NOT_NEGOTIATED;
}
@ -956,10 +969,15 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
gst_flow_get_name (ret));
}
gst_hls_demux_pause_tasks (demux);
g_mutex_lock (&demux->fragment_download_lock);
demux->last_ret = ret;
demux->download_finished = TRUE;
g_cond_signal (&demux->fragment_download_cond);
g_mutex_unlock (&demux->fragment_download_lock);
}
/* avoid having the source handle the same error again */
demux->last_ret = ret;
ret = GST_FLOW_OK;
return ret;
@ -967,7 +985,11 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
key_failed:
/* TODO Raise this error to the user */
GST_WARNING_OBJECT (demux, "Failed to decrypt data");
g_mutex_lock (&demux->fragment_download_lock);
demux->last_ret = GST_FLOW_ERROR;
demux->download_finished = TRUE;
g_cond_signal (&demux->fragment_download_cond);
g_mutex_unlock (&demux->fragment_download_lock);
return GST_FLOW_ERROR;
}
@ -978,10 +1000,15 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
GstHLSDemux *demux = (GstHLSDemux *) GST_PAD_PARENT (srcpad);;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
case GST_EVENT_EOS:{
GstFlowReturn ret;
if (demux->current_key)
gst_hls_demux_decrypt_end (demux);
g_mutex_lock (&demux->fragment_download_lock);
ret = demux->last_ret;
g_mutex_unlock (&demux->fragment_download_lock);
/* ideally this should be empty, but this eos might have been
* caused by an error on the source element */
GST_DEBUG_OBJECT (demux, "Data still on the adapter when EOS was received"
@ -989,7 +1016,7 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_adapter_clear (demux->adapter);
/* pending buffer is only used for encrypted streams */
if (demux->last_ret == GST_FLOW_OK) {
if (ret == GST_FLOW_OK) {
if (demux->pending_buffer) {
GstMapInfo info;
gsize unpadded_size;
@ -1005,7 +1032,7 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
g_get_monotonic_time () - demux->download_start_time;
demux->download_total_bytes +=
gst_buffer_get_size (demux->pending_buffer);
demux->last_ret = gst_pad_push (demux->srcpad, demux->pending_buffer);
ret = gst_pad_push (demux->srcpad, demux->pending_buffer);
demux->pending_buffer = NULL;
}
@ -1018,9 +1045,12 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
GST_DEBUG_OBJECT (demux, "Fragment download finished");
g_mutex_lock (&demux->fragment_download_lock);
demux->last_ret = ret;
demux->download_finished = TRUE;
g_cond_signal (&demux->fragment_download_cond);
g_mutex_unlock (&demux->fragment_download_lock);
break;
}
default:
break;
}
@ -2065,8 +2095,10 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
const gchar *key = NULL;
const guint8 *iv = NULL;
GstClockTime download_start_time = 0;
GstFlowReturn ret = GST_FLOW_OK;
*end_of_playlist = FALSE;
demux->download_finished = FALSE;
if (!gst_m3u8_client_get_next_fragment (demux->client, &discont,
&next_fragment_uri, &duration, &timestamp, &range_start, &range_end,
&key, &iv, demux->segment.rate > 0)) {
@ -2075,7 +2107,6 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
return FALSE;
}
g_mutex_lock (&demux->fragment_download_lock);
GST_DEBUG_OBJECT (demux,
"Fetching next fragment %s %" GST_TIME_FORMAT "(range=%" G_GINT64_FORMAT
"-%" G_GINT64_FORMAT ")", next_fragment_uri, GST_TIME_ARGS (timestamp),
@ -2090,8 +2121,10 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
demux->current_iv = iv;
/* Reset last flow return */
g_mutex_lock (&demux->fragment_download_lock);
demux->last_ret = GST_FLOW_OK;
g_clear_error (&demux->last_error);
g_mutex_unlock (&demux->fragment_download_lock);
if (!gst_hls_demux_update_source (demux, next_fragment_uri,
demux->client->main ? demux->client->main->uri : NULL,
@ -2100,7 +2133,6 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
*err =
g_error_new (GST_CORE_ERROR, GST_CORE_ERROR_MISSING_PLUGIN,
"Missing plugin to handle URI: '%s'", next_fragment_uri);
g_mutex_unlock (&demux->fragment_download_lock);
return FALSE;
}
@ -2115,35 +2147,49 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
range_end))) {
/* looks like the source can't handle seeks in READY */
g_mutex_lock (&demux->fragment_download_lock);
*err = g_error_new (GST_CORE_ERROR, GST_CORE_ERROR_NOT_IMPLEMENTED,
"Source element can't handle range requests");
demux->last_ret = GST_FLOW_ERROR;
g_mutex_unlock (&demux->fragment_download_lock);
return FALSE;
}
}
g_mutex_lock (&demux->fragment_download_lock);
if (G_LIKELY (demux->last_ret == GST_FLOW_OK)) {
demux->download_start_time = g_get_monotonic_time ();
download_start_time = gst_util_get_timestamp ();
g_mutex_unlock (&demux->fragment_download_lock);
gst_element_sync_state_with_parent (demux->src);
g_mutex_lock (&demux->fragment_download_lock);
/* wait for the fragment to be completely downloaded */
GST_DEBUG_OBJECT (demux, "Waiting for fragment download to finish: %s",
next_fragment_uri);
g_cond_wait (&demux->fragment_download_cond,
&demux->fragment_download_lock);
while (!demux->stop_stream_task && !demux->download_finished) {
g_cond_wait (&demux->fragment_download_cond,
&demux->fragment_download_lock);
}
ret = demux->last_ret;
}
g_mutex_unlock (&demux->fragment_download_lock);
} else {
demux->last_ret = GST_FLOW_CUSTOM_ERROR;
g_mutex_lock (&demux->fragment_download_lock);
ret = demux->last_ret = GST_FLOW_CUSTOM_ERROR;
g_mutex_unlock (&demux->fragment_download_lock);
}
g_mutex_unlock (&demux->fragment_download_lock);
/* flush the proxypads so that the EOS state is reset */
gst_pad_push_event (demux->src_srcpad, gst_event_new_flush_start ());
gst_pad_push_event (demux->src_srcpad, gst_event_new_flush_stop (TRUE));
if (demux->last_ret != GST_FLOW_OK) {
if (ret != GST_FLOW_OK) {
gst_element_set_state (demux->src, GST_STATE_NULL);
if (*err == NULL) {
g_mutex_lock (&demux->fragment_download_lock);
if (demux->last_error) {
*err = demux->last_error;
demux->last_error = NULL;
@ -2151,6 +2197,7 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
*err = g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED,
"Failed to download fragment");
}
g_mutex_unlock (&demux->fragment_download_lock);
}
} else {
gst_element_set_state (demux->src, GST_STATE_READY);
@ -2174,7 +2221,7 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux,
/* clear the ghostpad eos state */
gst_hls_demux_stream_clear_eos_state (demux);
if (demux->last_ret != GST_FLOW_OK)
if (ret != GST_FLOW_OK)
return FALSE;
return TRUE;

View file

@ -119,6 +119,7 @@ struct _GstHLSDemux
GstElement *src;
GstPad *src_srcpad; /* handy link to src's src pad */
GMutex fragment_download_lock;
gboolean download_finished;
GCond fragment_download_cond;
GstClockTime current_timestamp;
GstClockTime current_duration;