decodebin3: Refactor incoming collection handling

Simplify its usage by having it directly create the message if the collection
changed. This is what caller were always doing and avoids releasing selection
locks yet-another-time

Also use it in more places to avoid code repetition

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6774>
This commit is contained in:
Edward Hervey 2024-03-19 11:27:23 +01:00 committed by GStreamer Marge Bot
parent 17e385a6ff
commit aeea856f4e

View file

@ -488,7 +488,7 @@ gst_decodebin3_select_stream (GstDecodebin3 * dbin,
static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
static void gst_decodebin3_release_pad (GstElement * element, GstPad * pad);
static gboolean handle_stream_collection (GstDecodebin3 * dbin,
static GstMessage *handle_stream_collection_locked (GstDecodebin3 * dbin,
GstStreamCollection * collection, DecodebinInput * input);
static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
@ -1160,7 +1160,6 @@ gst_decodebin3_release_pad (GstElement * element, GstPad * pad)
{
GstDecodebin3 *dbin = (GstDecodebin3 *) element;
DecodebinInput *input = g_object_get_data (G_OBJECT (pad), "decodebin.input");
GstStreamCollection *collection = NULL;
gulong probe_id = 0;
GstMessage *msg;
@ -1176,43 +1175,25 @@ gst_decodebin3_release_pad (GstElement * element, GstPad * pad)
input->collection = NULL;
}
SELECTION_LOCK (dbin);
collection = get_merged_collection (dbin);
if (!collection) {
SELECTION_UNLOCK (dbin);
goto beach;
}
if (collection == dbin->collection) {
SELECTION_UNLOCK (dbin);
gst_object_unref (collection);
goto beach;
msg = handle_stream_collection_locked (dbin, NULL, input);
if (msg) {
if (input->parsebin)
/* Drop duration queries that the application might be doing while this
* message is posted */
probe_id =
gst_pad_add_probe (input->parsebin_sink,
GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
(GstPadProbeCallback) query_duration_drop_probe, input, NULL);
gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
update_requested_selection (dbin);
if (input->parsebin) {
gst_pad_remove_probe (input->parsebin_sink, probe_id);
}
}
GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
if (dbin->collection)
gst_object_unref (dbin->collection);
dbin->collection = collection;
dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
msg =
gst_message_new_stream_collection ((GstObject *) dbin, dbin->collection);
if (input->parsebin)
/* Drop duration queries that the application might be doing while this message is posted */
probe_id = gst_pad_add_probe (input->parsebin_sink,
GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
(GstPadProbeCallback) query_duration_drop_probe, input, NULL);
SELECTION_UNLOCK (dbin);
gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
update_requested_selection (dbin);
if (input->parsebin) {
gst_pad_remove_probe (input->parsebin_sink, probe_id);
}
beach:
if (!input->is_main) {
dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
free_input (dbin, input);
@ -1429,22 +1410,14 @@ sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event)
gst_event_parse_stream_collection (event, &collection);
if (collection) {
gboolean post_collection;
GstMessage *collection_msg;
INPUT_LOCK (dbin);
post_collection = handle_stream_collection (dbin, collection, input);
collection_msg =
handle_stream_collection_locked (dbin, collection, input);
gst_object_unref (collection);
INPUT_UNLOCK (dbin);
SELECTION_LOCK (dbin);
/* Post the (potentially) updated collection */
if (post_collection) {
GstMessage *msg;
msg =
gst_message_new_stream_collection ((GstObject *) dbin,
dbin->collection);
SELECTION_UNLOCK (dbin);
gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
} else
SELECTION_UNLOCK (dbin);
if (collection_msg)
gst_element_post_message (GST_ELEMENT_CAST (dbin), collection_msg);
/* In all cases we want to make sure the selection is valid */
update_requested_selection (dbin);
@ -1966,14 +1939,25 @@ stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
return NULL;
}
/* Call with INPUT_LOCK taken.
* Returns TRUE if the dbin collection changed
/** handle_stream_collection_locked:
* @dbin:
* @collection: (transfer none): The new collection for @input. Can be %NULL.
* @input: The #DecodebinInput
*
* Called with INPUT_LOCK taken.
*
* Handle a new (or updated) @collection for the given @input. If this results
* in a different collection, the appropriate GST_MESSAGE_STREAM_COLLECTION to
* be posted will be returned.
*
* Returns: A #GstMessage to be posted on the bus if a new collection was
* generated, else %NULL.
*/
static gboolean
handle_stream_collection (GstDecodebin3 * dbin,
static GstMessage *
handle_stream_collection_locked (GstDecodebin3 * dbin,
GstStreamCollection * collection, DecodebinInput * input)
{
gboolean ret = TRUE;
GstMessage *message = NULL;
#ifndef GST_DISABLE_GST_DEBUG
const gchar *upstream_id;
guint i;
@ -1981,18 +1965,21 @@ handle_stream_collection (GstDecodebin3 * dbin,
if (!input) {
GST_DEBUG_OBJECT (dbin,
"Couldn't find corresponding input, most likely shutting down");
return FALSE;
return NULL;
}
/* Replace collection in input */
if (input->collection)
gst_object_unref (input->collection);
input->collection = gst_object_ref (collection);
if (collection)
input->collection = gst_object_ref (collection);
GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
input);
/* Merge collection if needed */
collection = get_merged_collection (dbin);
if (!collection)
return NULL;
#ifndef GST_DISABLE_GST_DEBUG
/* Just some debugging */
@ -2025,10 +2012,11 @@ handle_stream_collection (GstDecodebin3 * dbin,
if (dbin->collection == NULL) {
GST_DEBUG_OBJECT (dbin, "Storing updated global collection");
dbin->collection = collection;
message =
gst_message_new_stream_collection ((GstObject *) dbin, collection);
} else if (dbin->collection == collection) {
GST_DEBUG_OBJECT (dbin, "Collection didn't change");
gst_object_unref (collection);
ret = FALSE;
} else {
/* We need to check who emitted this collection (the owner).
* If we already had a collection from that user, this one is an update,
@ -2039,12 +2027,14 @@ handle_stream_collection (GstDecodebin3 * dbin,
* When all streams from active collection are drained in multiqueue output ? */
gst_object_unref (dbin->collection);
dbin->collection = collection;
message =
gst_message_new_stream_collection ((GstObject *) dbin, collection);
}
if (ret)
if (message)
dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
SELECTION_UNLOCK (dbin);
return ret;
return message;
}
/* Must be called with the selection lock taken */
@ -2119,6 +2109,7 @@ gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
{
GstStreamCollection *collection = NULL;
DecodebinInput *input;
GstMessage *collection_message;
INPUT_LOCK (dbin);
input =
@ -2137,25 +2128,21 @@ gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
goto drop_message;
}
gst_message_parse_stream_collection (message, &collection);
if (collection) {
posting_collection = handle_stream_collection (dbin, collection, input);
if (!collection) {
INPUT_UNLOCK (dbin);
break;
}
collection_message =
handle_stream_collection_locked (dbin, collection, input);
INPUT_UNLOCK (dbin);
SELECTION_LOCK (dbin);
if (posting_collection) {
/* Replace collection message, we most likely aggregated it */
GstMessage *new_msg;
new_msg =
gst_message_new_stream_collection ((GstObject *) dbin,
dbin->collection);
if (collection_message) {
posting_collection = TRUE;
gst_message_unref (message);
message = new_msg;
message = collection_message;
}
SELECTION_UNLOCK (dbin);
if (collection)
gst_object_unref (collection);
gst_object_unref (collection);
break;
}
case GST_MESSAGE_LATENCY: