adaptivedemux: remove some deadlocks using webkitwebsrc.

WebKit's websrc depends on the main-thread for download completion
rendezvous. This exposed a number of deadlocks in adaptivedemux due to
it holding the MANIFEST_LOCK during network requests, and also needing
to hold it to change_state and resolve queries, which frequently occur
during these download windows.

Make demux->running MT-safe so that it can be accessed without using the
MANIFEST_LOCK. In case a source is downloading and requires a MT-thread
notification for completion of the fragment download, a state change
during this download window will deadlock unless we cancel the downloads
and ensure they are not restarted before we finish the state-change.

Also make demux->priv->have_manifest MT-safe. A duration query happening
in the window described above can deadlock for the same reason. Other
src queries (like SEEKING) that happen in this window also could
deadlock, but I haven't hit this scenario.

Increase granularity of API_LOCK'ing in change_state as well. We need to
cancel downloads before trying to take this lock, since sink events
(EOS) will hold it before starting a fragment download.
This commit is contained in:
Charlie Turner 2019-07-02 12:27:40 +01:00
parent e898f1565d
commit 659d76a633
3 changed files with 46 additions and 24 deletions

View file

@ -73,6 +73,7 @@ static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux,
gboolean update, GError ** err);
static gchar *gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf);
/* FIXME: the return value is never used? */
static gboolean gst_hls_demux_change_playlist (GstHLSDemux * demux,
guint max_bitrate, gboolean * changed);
static GstBuffer *gst_hls_demux_decrypt_fragment (GstHLSDemux * demux,
@ -1173,6 +1174,8 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux)
{
GstHLSDemux *demux = GST_HLS_DEMUX_CAST (ademux);
GST_DEBUG_OBJECT (demux, "resetting");
GST_M3U8_CLIENT_LOCK (hlsdemux->client);
if (demux->master) {
gst_hls_master_playlist_unref (demux->master);
@ -1394,7 +1397,8 @@ retry:
if (download == NULL) {
gchar *base_uri;
if (!update || main_checked || demux->master->is_simple) {
if (!update || main_checked || demux->master->is_simple
|| !gst_adaptive_demux_is_running (GST_ADAPTIVE_DEMUX_CAST (demux))) {
g_free (uri);
return FALSE;
}
@ -1627,7 +1631,7 @@ retry_failover_protection:
if (changed)
*changed = TRUE;
stream->discont = TRUE;
} else {
} else if (gst_adaptive_demux_is_running (GST_ADAPTIVE_DEMUX_CAST (demux))) {
GstHLSVariantStream *failover_variant = NULL;
GList *failover;

View file

@ -166,7 +166,7 @@ enum
struct _GstAdaptiveDemuxPrivate
{
GstAdapter *input_adapter; /* protected by manifest_lock */
gboolean have_manifest; /* protected by manifest_lock */
gint have_manifest; /* MT safe */
GList *old_streams; /* protected by manifest_lock */
@ -564,25 +564,31 @@ gst_adaptive_demux_change_state (GstElement * element,
GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
GST_API_LOCK (demux);
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
gst_uri_downloader_cancel (demux->downloader);
GST_API_LOCK (demux);
GST_MANIFEST_LOCK (demux);
demux->running = FALSE;
gst_adaptive_demux_reset (demux);
GST_MANIFEST_UNLOCK (demux);
GST_API_UNLOCK (demux);
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
GST_API_LOCK (demux);
GST_MANIFEST_LOCK (demux);
gst_adaptive_demux_reset (demux);
/* Clear "cancelled" flag in uridownloader since subclass might want to
* use uridownloader to fetch another manifest */
gst_uri_downloader_reset (demux->downloader);
if (demux->priv->have_manifest)
if (g_atomic_int_get (&demux->priv->have_manifest))
gst_adaptive_demux_start_manifest_update_task (demux);
demux->running = TRUE;
GST_MANIFEST_UNLOCK (demux);
GST_API_UNLOCK (demux);
if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
GST_DEBUG_OBJECT (demux, "demuxer has started running");
break;
default:
break;
@ -595,7 +601,6 @@ gst_adaptive_demux_change_state (GstElement * element,
*/
result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
GST_API_UNLOCK (demux);
return result;
}
@ -686,7 +691,7 @@ gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
(NULL));
ret = FALSE;
} else {
demux->priv->have_manifest = TRUE;
g_atomic_int_set (&demux->priv->have_manifest, TRUE);
}
gst_buffer_unref (manifest_buffer);
@ -829,7 +834,7 @@ gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
demux->manifest_base_uri = NULL;
gst_adapter_clear (demux->priv->input_adapter);
demux->priv->have_manifest = FALSE;
g_atomic_int_set (&demux->priv->have_manifest, FALSE);
gst_segment_init (&demux->segment, GST_FORMAT_TIME);
@ -1051,7 +1056,7 @@ gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
demux->prepared_streams = demux->next_streams;
demux->next_streams = NULL;
if (!demux->running) {
if (!gst_adaptive_demux_is_running (demux)) {
GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
return TRUE;
}
@ -1843,7 +1848,7 @@ gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
if (stream) {
if (!stream->cancelled && demux->running &&
if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
stream->last_ret == GST_FLOW_NOT_LINKED) {
stream->last_ret = GST_FLOW_OK;
stream->restart_download = TRUE;
@ -1916,9 +1921,8 @@ gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
gst_query_parse_duration (query, &fmt, NULL);
GST_MANIFEST_LOCK (demux);
if (fmt == GST_FORMAT_TIME && demux->priv->have_manifest) {
if (fmt == GST_FORMAT_TIME
&& g_atomic_int_get (&demux->priv->have_manifest)) {
duration = demux_class->get_duration (demux);
if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
@ -1927,8 +1931,6 @@ gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
}
}
GST_MANIFEST_UNLOCK (demux);
GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
break;
@ -1943,15 +1945,14 @@ gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
gint64 stop = -1;
gint64 start = 0;
GST_MANIFEST_LOCK (demux);
if (!demux->priv->have_manifest) {
GST_MANIFEST_UNLOCK (demux);
if (!g_atomic_int_get (&demux->priv->have_manifest)) {
GST_INFO_OBJECT (demux,
"Don't have manifest yet, can't answer seeking query");
return FALSE; /* can't answer without manifest */
}
GST_MANIFEST_LOCK (demux);
gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
if (fmt == GST_FORMAT_TIME) {
@ -2013,7 +2014,7 @@ gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
{
GList *iter;
if (!demux->running) {
if (!gst_adaptive_demux_is_running (demux)) {
GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
return;
}
@ -4487,6 +4488,20 @@ gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
return g_date_time_new_from_timeval_utc (&gtv);
}
/**
* gst_adaptive_demux_is_running
* @demux: #GstAdaptiveDemux
* Returns: whether the demuxer is processing data
*
* Returns FALSE if shutdown has started (transitioning down from
* PAUSED), otherwise TRUE.
*/
gboolean
gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
{
return g_atomic_int_get (&demux->running);
}
static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
{

View file

@ -207,7 +207,7 @@ struct _GstAdaptiveDemux
/*< private >*/
GstBin bin;
gboolean running;
gint running;
gsize stream_struct_size;
@ -529,6 +529,9 @@ GstClockTime gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux);
GST_ADAPTIVE_DEMUX_API
GDateTime *gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux);
GST_ADAPTIVE_DEMUX_API
gboolean gst_adaptive_demux_is_running (GstAdaptiveDemux * demux);
G_END_DECLS
#endif