decodebin3: Protect fields related to streams handling with the SELECTION_LOCK

Fields related to stream handling (input_streams,
output_streams, slots, guint slot_id) where used totally unprotected
until know.

This lead to several races, especially playing back RTSP streams.

To protect those fields, the OBJECT_LOCK can not be used as we sometimes
need to be able to post message on the bus while holding it.

decodebin3 already has a lock to manage stream selection, and in the end
it makes sense to protect all the stream management fields with the same
lock which is why we reuse the SELECTION_LOCK here.

https://bugzilla.gnome.org/show_bug.cgi?id=784012
This commit is contained in:
Thibault Saunier 2017-06-15 12:48:42 -04:00 committed by Edward Hervey
parent 1188345886
commit 4b3798fedc
2 changed files with 25 additions and 14 deletions

View file

@ -63,6 +63,7 @@ static void parsebin_pad_added_cb (GstElement * demux, GstPad * pad,
static void parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, static void parsebin_pad_removed_cb (GstElement * demux, GstPad * pad,
DecodebinInput * input); DecodebinInput * input);
/* WITH SELECTION_LOCK TAKEN! */
static gboolean static gboolean
pending_pads_are_eos (DecodebinInput * input) pending_pads_are_eos (DecodebinInput * input)
{ {
@ -77,6 +78,7 @@ pending_pads_are_eos (DecodebinInput * input)
return TRUE; return TRUE;
} }
/* WITH SELECTION_LOCK TAKEN! */
static gboolean static gboolean
all_inputs_are_eos (GstDecodebin3 * dbin) all_inputs_are_eos (GstDecodebin3 * dbin)
{ {
@ -99,6 +101,7 @@ all_inputs_are_eos (GstDecodebin3 * dbin)
return TRUE; return TRUE;
} }
/* WITH SELECTION_LOCK TAKEN! */
static void static void
check_all_streams_for_eos (GstDecodebin3 * dbin) check_all_streams_for_eos (GstDecodebin3 * dbin)
{ {
@ -253,7 +256,9 @@ parse_chain_output_probe (GstPad * pad, GstPadProbeInfo * info,
input->saw_eos = TRUE; input->saw_eos = TRUE;
if (all_inputs_are_eos (input->dbin)) { if (all_inputs_are_eos (input->dbin)) {
GST_DEBUG_OBJECT (pad, "real input pad, marking as EOS"); GST_DEBUG_OBJECT (pad, "real input pad, marking as EOS");
SELECTION_LOCK (input->dbin);
check_all_streams_for_eos (input->dbin); check_all_streams_for_eos (input->dbin);
SELECTION_UNLOCK (input->dbin);
} else { } else {
GstPad *peer = gst_pad_get_peer (input->srcpad); GstPad *peer = gst_pad_get_peer (input->srcpad);
if (peer) { if (peer) {
@ -337,12 +342,15 @@ create_input_stream (GstDecodebin3 * dbin, GstStream * stream, GstPad * pad,
(GstPadProbeCallback) parse_chain_output_probe, res, NULL); (GstPadProbeCallback) parse_chain_output_probe, res, NULL);
/* Add to list of current input streams */ /* Add to list of current input streams */
SELECTION_LOCK (dbin);
dbin->input_streams = g_list_append (dbin->input_streams, res); dbin->input_streams = g_list_append (dbin->input_streams, res);
SELECTION_UNLOCK (dbin);
GST_DEBUG_OBJECT (pad, "Done creating input stream"); GST_DEBUG_OBJECT (pad, "Done creating input stream");
return res; return res;
} }
/* WITH SELECTION_LOCK TAKEN! */
static void static void
remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream) remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
{ {
@ -362,9 +370,7 @@ remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
} }
} }
g_mutex_lock (&dbin->selection_lock);
slot = get_slot_for_input (dbin, stream); slot = get_slot_for_input (dbin, stream);
g_mutex_unlock (&dbin->selection_lock);
if (slot) { if (slot) {
slot->pending_stream = NULL; slot->pending_stream = NULL;
slot->input = NULL; slot->input = NULL;
@ -390,8 +396,6 @@ parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
GstDecodebin3 *dbin = input->dbin; GstDecodebin3 *dbin = input->dbin;
GList *tmp, *unused_slot = NULL; GList *tmp, *unused_slot = NULL;
GST_FIXME_OBJECT (dbin, "Need a lock !");
GST_DEBUG_OBJECT (pad, "Got a buffer ! UNBLOCK !"); GST_DEBUG_OBJECT (pad, "Got a buffer ! UNBLOCK !");
/* Any data out the demuxer means it's not creating pads /* Any data out the demuxer means it's not creating pads
@ -402,6 +406,7 @@ parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
/* 2. Remove unused streams (push EOS) */ /* 2. Remove unused streams (push EOS) */
GST_DEBUG_OBJECT (dbin, "Removing unused streams"); GST_DEBUG_OBJECT (dbin, "Removing unused streams");
SELECTION_LOCK (dbin);
tmp = dbin->input_streams; tmp = dbin->input_streams;
while (tmp != NULL) { while (tmp != NULL) {
DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data; DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
@ -423,6 +428,7 @@ parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
} else } else
tmp = next; tmp = next;
} }
SELECTION_UNLOCK (dbin);
GST_DEBUG_OBJECT (dbin, "Creating new streams (if needed)"); GST_DEBUG_OBJECT (dbin, "Creating new streams (if needed)");
/* 3. Create new streams */ /* 3. Create new streams */
@ -575,11 +581,8 @@ parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, DecodebinInput * inp)
SELECTION_LOCK (dbin); SELECTION_LOCK (dbin);
slot = get_slot_for_input (dbin, input); slot = get_slot_for_input (dbin, input);
SELECTION_UNLOCK (dbin);
remove_input_stream (dbin, input); remove_input_stream (dbin, input);
SELECTION_LOCK (dbin);
if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) { if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) {
/* if slot is still there and already drained, remove it in here */ /* if slot is still there and already drained, remove it in here */
if (slot->output) { if (slot->output) {

View file

@ -207,16 +207,15 @@ struct _GstDecodebin3
GstElement *multiqueue; GstElement *multiqueue;
/* FIXME : Mutex for protecting values below */ /* selection_lock protects access to following variables */
GstStreamCollection *collection; /* Active collection */ GMutex selection_lock;
GList *input_streams; /* List of DecodebinInputStream for active collection */ GList *input_streams; /* List of DecodebinInputStream for active collection */
GList *output_streams; /* List of DecodebinOutputStream used for output */ GList *output_streams; /* List of DecodebinOutputStream used for output */
GList *slots; /* List of MultiQueueSlot */ GList *slots; /* List of MultiQueueSlot */
guint slot_id; guint slot_id;
/* selection_lock protects access to following variables */ /* Active collection */
GMutex selection_lock; GstStreamCollection *collection;
/* requested selection of stream-id to activate post-multiqueue */ /* requested selection of stream-id to activate post-multiqueue */
GList *requested_selection; GList *requested_selection;
/* list of stream-id currently activated in output */ /* list of stream-id currently activated in output */
@ -236,7 +235,6 @@ struct _GstDecodebin3
* FIXME : Is this really needed ? */ * FIXME : Is this really needed ? */
GList *pending_collection; GList *pending_collection;
/* Factories */ /* Factories */
GMutex factories_lock; GMutex factories_lock;
guint32 factories_cookie; guint32 factories_cookie;
@ -1264,6 +1262,7 @@ handle_stream_collection (GstDecodebin3 * dbin,
#endif #endif
/* Store collection for later usage */ /* Store collection for later usage */
SELECTION_LOCK (dbin);
if (dbin->collection == NULL) { if (dbin->collection == NULL) {
dbin->collection = collection; dbin->collection = collection;
} else { } else {
@ -1279,6 +1278,7 @@ handle_stream_collection (GstDecodebin3 * dbin,
/* dbin->pending_collection = */ /* dbin->pending_collection = */
/* g_list_append (dbin->pending_collection, collection); */ /* g_list_append (dbin->pending_collection, collection); */
} }
SELECTION_UNLOCK (dbin);
} }
static void static void
@ -1301,6 +1301,7 @@ gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
posting_collection = TRUE; posting_collection = TRUE;
INPUT_UNLOCK (dbin); INPUT_UNLOCK (dbin);
} }
SELECTION_LOCK (dbin); SELECTION_LOCK (dbin);
if (dbin->collection && collection != dbin->collection) { if (dbin->collection && collection != dbin->collection) {
/* Replace collection message, we most likely aggregated it */ /* Replace collection message, we most likely aggregated it */
@ -1312,6 +1313,7 @@ gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
message = new_msg; message = new_msg;
} }
SELECTION_UNLOCK (dbin); SELECTION_UNLOCK (dbin);
if (collection) if (collection)
gst_object_unref (collection); gst_object_unref (collection);
break; break;
@ -1651,8 +1653,9 @@ multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
} }
slot->probe_id = 0; slot->probe_id = 0;
dbin->slots = g_list_remove (dbin->slots, slot); dbin->slots = g_list_remove (dbin->slots, slot);
free_multiqueue_slot_async (dbin, slot);
SELECTION_UNLOCK (dbin); SELECTION_UNLOCK (dbin);
free_multiqueue_slot_async (dbin, slot);
ret = GST_PAD_PROBE_REMOVE; ret = GST_PAD_PROBE_REMOVE;
} }
break; break;
@ -1726,11 +1729,14 @@ create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
gst_stream_type_get_name (type)); gst_stream_type_get_name (type));
slot = g_new0 (MultiQueueSlot, 1); slot = g_new0 (MultiQueueSlot, 1);
slot->dbin = dbin; slot->dbin = dbin;
slot->id = dbin->slot_id++; slot->id = dbin->slot_id++;
slot->type = type; slot->type = type;
slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u"); slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u");
if (slot->sink_pad == NULL) if (slot->sink_pad == NULL)
goto fail; goto fail;
it = gst_pad_iterate_internal_links (slot->sink_pad); it = gst_pad_iterate_internal_links (slot->sink_pad);
if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
|| ((slot->src_pad = g_value_dup_object (&item)) == NULL)) { || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
@ -1751,7 +1757,9 @@ create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot, GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
GST_DEBUG_PAD_NAME (slot->src_pad)); GST_DEBUG_PAD_NAME (slot->src_pad));
dbin->slots = g_list_append (dbin->slots, slot); dbin->slots = g_list_append (dbin->slots, slot);
return slot; return slot;
/* ERRORS */ /* ERRORS */