decodebin3: Make update/posting of collection messages atomic

The presence (or not) of a collection on an input will determine whether events
will be throttled so that there are only forwarded when that input gets a valid
collection.

Therefore the input lock should be used.

In addition to that, we want to ensure that the application/user has a chance to
reliably (i.e. synchronously) specify what streams it is interested in by
sending a GST_EVENT_SELECT_STREAMS.

But we cannot allow anything to go forward until that message posting has come
back, otherwise we run in various races.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/3872

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7594>
This commit is contained in:
Edward Hervey 2024-10-01 17:17:01 +02:00 committed by GStreamer Marge Bot
parent 120f2a6b83
commit 9a59fc7168

View file

@ -260,6 +260,10 @@ struct _GstDecodebin3
guint32 input_counter; guint32 input_counter;
/* Current stream group_id (default : GST_GROUP_ID_INVALID) */ /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
guint32 current_group_id; guint32 current_group_id;
/* Whether decodebin is currently posting a stream collection on the bus */
gboolean posting_collection;
/* GCond to block against and know when posting_collection changed */
GCond posting_cond;
/* End of variables protected by input_lock */ /* End of variables protected by input_lock */
GstElement *multiqueue; GstElement *multiqueue;
@ -712,6 +716,8 @@ gst_decodebin3_init (GstDecodebin3 * dbin)
g_mutex_init (&dbin->factories_lock); g_mutex_init (&dbin->factories_lock);
g_mutex_init (&dbin->selection_lock); g_mutex_init (&dbin->selection_lock);
g_mutex_init (&dbin->input_lock); g_mutex_init (&dbin->input_lock);
g_cond_init (&dbin->posting_cond);
dbin->posting_collection = FALSE;
dbin->caps = gst_static_caps_get (&default_raw_caps); dbin->caps = gst_static_caps_get (&default_raw_caps);
@ -802,6 +808,7 @@ gst_decodebin3_finalize (GObject * object)
g_mutex_clear (&dbin->factories_lock); g_mutex_clear (&dbin->factories_lock);
g_mutex_clear (&dbin->selection_lock); g_mutex_clear (&dbin->selection_lock);
g_mutex_clear (&dbin->input_lock); g_mutex_clear (&dbin->input_lock);
g_cond_clear (&dbin->posting_cond);
G_OBJECT_CLASS (parent_class)->finalize (object); G_OBJECT_CLASS (parent_class)->finalize (object);
} }
@ -1992,13 +1999,29 @@ sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
gst_event_parse_stream_collection (event, &collection); gst_event_parse_stream_collection (event, &collection);
if (collection) { if (collection) {
GstMessage *collection_msg; GstMessage *collection_msg;
/* If we post a collection, we need to release the input lock, but we
* want to ensure code that needs to check of validity of the input
* collection to wait until the message was properly posted so that any
* synchronous handler can properly set their requested stream
* selection. */
INPUT_LOCK (dbin); INPUT_LOCK (dbin);
while (dbin->posting_collection)
g_cond_wait (&dbin->posting_cond, &dbin->input_lock);
collection_msg = collection_msg =
handle_stream_collection_locked (dbin, collection, input); handle_stream_collection_locked (dbin, collection, input);
gst_object_unref (collection); gst_object_unref (collection);
if (collection_msg) {
GST_DEBUG_OBJECT (sinkpad, "Posting collection");
dbin->posting_collection = TRUE;
INPUT_UNLOCK (dbin); INPUT_UNLOCK (dbin);
if (collection_msg)
gst_element_post_message (GST_ELEMENT_CAST (dbin), collection_msg); gst_element_post_message (GST_ELEMENT_CAST (dbin), collection_msg);
INPUT_LOCK (dbin);
dbin->posting_collection = FALSE;
GST_DEBUG_OBJECT (sinkpad, "Done posting collection");
g_cond_broadcast (&dbin->posting_cond);
}
INPUT_UNLOCK (dbin);
} }
/* If we are waiting to create an identity passthrough, do it now */ /* If we are waiting to create an identity passthrough, do it now */
@ -2013,6 +2036,7 @@ sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
/* Drain all pending events */ /* Drain all pending events */
if (input->events_waiting_for_collection) { if (input->events_waiting_for_collection) {
GST_DEBUG_OBJECT (sinkpad, "Draining pending events");
GList *tmp; GList *tmp;
for (tmp = input->events_waiting_for_collection; tmp; tmp = tmp->next) for (tmp = input->events_waiting_for_collection; tmp; tmp = tmp->next)
gst_pad_event_default (sinkpad, GST_OBJECT (dbin), tmp->data); gst_pad_event_default (sinkpad, GST_OBJECT (dbin), tmp->data);
@ -2100,13 +2124,22 @@ sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
/* For parsed inputs, if we are waiting for a collection event, store them for /* For parsed inputs, if we are waiting for a collection event, store them for
* now */ * now */
if (!input->collection && input->input_is_parsed) { if (input->input_is_parsed) {
gboolean have_collection;
INPUT_LOCK (dbin);
while (dbin->posting_collection) {
g_cond_wait (&dbin->posting_cond, &dbin->input_lock);
}
have_collection = (input->collection != NULL);
INPUT_UNLOCK (dbin);
if (!have_collection) {
GST_DEBUG_OBJECT (sinkpad, GST_DEBUG_OBJECT (sinkpad,
"Postponing event until we get a stream collection"); "Postponing event until we get a stream collection");
input->events_waiting_for_collection = input->events_waiting_for_collection =
g_list_append (input->events_waiting_for_collection, event); g_list_append (input->events_waiting_for_collection, event);
return TRUE; return TRUE;
} }
}
/* Chain to parent function */ /* Chain to parent function */
return gst_pad_event_default (sinkpad, GST_OBJECT (dbin), event); return gst_pad_event_default (sinkpad, GST_OBJECT (dbin), event);