decodebin3: Refactor slot/output (re)configuration

* Re-use existing function where possible
* Only set/reset keyframe probe at unique places

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6774>
This commit is contained in:
Edward Hervey 2024-03-24 12:51:43 +01:00 committed by GStreamer Marge Bot
parent faaedd2bb9
commit 49cd8213bf

View file

@ -545,8 +545,8 @@ static void gst_decodebin_input_unblock_streams (DecodebinInput * input,
gboolean unblock_other_inputs);
static void gst_decodebin_input_link_to_slot (DecodebinInputStream * input);
static gboolean reconfigure_output_stream (DecodebinOutputStream * output,
MultiQueueSlot * slot, GstMessage ** msg);
static gboolean db_output_stream_reconfigure (DecodebinOutputStream * output,
GstMessage ** msg);
static void db_output_stream_reset (DecodebinOutputStream * output);
static void db_output_stream_free (DecodebinOutputStream * output);
static DecodebinOutputStream *db_output_stream_new (GstDecodebin3 * dbin,
@ -1529,67 +1529,6 @@ parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
}
}
static gboolean
clear_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
{
GST_DEBUG_OBJECT (pad, "clearing sticky event %" GST_PTR_FORMAT, *event);
gst_event_unref (*event);
*event = NULL;
return TRUE;
}
static gboolean
copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
{
GstPad *gpad = GST_PAD_CAST (user_data);
GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
gst_pad_store_sticky_event (gpad, *event);
return TRUE;
}
static gboolean
decode_pad_set_target (GstGhostPad * pad, GstPad * target)
{
gboolean res = gst_ghost_pad_set_target (pad, target);
if (!res)
return res;
if (target == NULL)
gst_pad_sticky_events_foreach (GST_PAD_CAST (pad), clear_sticky_events,
NULL);
else
gst_pad_sticky_events_foreach (target, copy_sticky_events, pad);
return res;
}
static CandidateDecoder *
add_candidate_decoder (GstDecodebin3 * dbin, GstElement * element)
{
GST_OBJECT_LOCK (dbin);
CandidateDecoder *candidate;
candidate = g_new0 (CandidateDecoder, 1);
candidate->element = element;
dbin->candidate_decoders =
g_list_prepend (dbin->candidate_decoders, candidate);
GST_OBJECT_UNLOCK (dbin);
return candidate;
}
static void
remove_candidate_decoder (GstDecodebin3 * dbin, CandidateDecoder * candidate)
{
GST_OBJECT_LOCK (dbin);
dbin->candidate_decoders =
g_list_remove (dbin->candidate_decoders, candidate);
if (candidate->error)
gst_message_unref (candidate->error);
g_free (candidate);
GST_OBJECT_UNLOCK (dbin);
}
/** gst_decodebin_input_ensure_parsebin:
* @input: A #DecodebinInput
*
@ -3183,16 +3122,27 @@ no_more_streams_locked (GstDecodebin3 * dbin)
&& !dbin->to_activate);
}
/** mq_slot_check_reconfiguration:
* @slot: A #MultiQueueSlot
*
* Check if the @slot output needs to be (re)configured:
* * Should an output be created/setup ?
* * Should the associated output be reconfigured ?
*
* Will also handle missing streams message emission
*/
static void
check_slot_reconfiguration (GstDecodebin3 * dbin, MultiQueueSlot * slot)
mq_slot_check_reconfiguration (MultiQueueSlot * slot)
{
GstDecodebin3 *dbin = slot->dbin;
DecodebinOutputStream *output;
GstMessage *msg = NULL;
gboolean no_more_streams;
SELECTION_LOCK (dbin);
output = get_output_for_slot (slot);
output = mq_slot_get_or_create_output (slot);
if (!output) {
/* Slot is not used. */
no_more_streams = no_more_streams_locked (dbin);
SELECTION_UNLOCK (dbin);
if (no_more_streams)
@ -3201,7 +3151,7 @@ check_slot_reconfiguration (GstDecodebin3 * dbin, MultiQueueSlot * slot)
return;
}
if (!reconfigure_output_stream (output, slot, &msg)) {
if (!db_output_stream_reconfigure (output, &msg)) {
GST_DEBUG_OBJECT (dbin, "Removing failing stream from selection: %s ",
gst_stream_get_stream_id (slot->active_stream));
slot->dbin->requested_selection =
@ -3224,6 +3174,8 @@ check_slot_reconfiguration (GstDecodebin3 * dbin, MultiQueueSlot * slot)
mq_slot_reassign (slot);
} else {
GstMessage *selection_msg = is_selection_done (dbin);
/* All good, we reconfigured the associated output. Check if we're done with
* the current selection */
SELECTION_UNLOCK (dbin);
if (selection_msg)
gst_element_post_message ((GstElement *) slot->dbin, selection_msg);
@ -3301,7 +3253,7 @@ multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
case GST_EVENT_CAPS:
{
/* Configure the output slot if needed */
check_slot_reconfiguration (dbin, slot);
mq_slot_check_reconfiguration (slot);
}
break;
case GST_EVENT_EOS:
@ -3597,256 +3549,218 @@ keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
return GST_PAD_PROBE_DROP;
}
static void
remove_decoder_link (DecodebinOutputStream * output, MultiQueueSlot * slot)
static gboolean
clear_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
{
GstDecodebin3 *dbin = output->dbin;
if (gst_pad_is_linked (output->decoder_sink)) {
gst_pad_unlink (slot->src_pad, output->decoder_sink);
}
if (output->drop_probe_id) {
gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
output->drop_probe_id = 0;
}
gst_element_set_locked_state (output->decoder, TRUE);
gst_element_set_state (output->decoder, GST_STATE_NULL);
gst_bin_remove ((GstBin *) dbin, output->decoder);
output->decoder = NULL;
GST_DEBUG_OBJECT (pad, "clearing sticky event %" GST_PTR_FORMAT, *event);
gst_event_unref (*event);
*event = NULL;
return TRUE;
}
/* Returns FALSE if the output couldn't be properly configured and the
* associated GstStreams should be disabled */
static gboolean
reconfigure_output_stream (DecodebinOutputStream * output,
MultiQueueSlot * slot, GstMessage ** msg)
copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
{
GstDecodebin3 *dbin = output->dbin;
GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
gboolean needs_decoder;
gboolean ret = TRUE;
GstPad *gpad = GST_PAD_CAST (user_data);
needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
gst_pad_store_sticky_event (gpad, *event);
GST_DEBUG_OBJECT (dbin,
"Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
needs_decoder);
return TRUE;
}
/* FIXME : Maybe make the output un-hook itself automatically ? */
if (output->slot != NULL && output->slot != slot) {
GST_WARNING_OBJECT (dbin,
"Output still linked to another slot (%p)", output->slot);
gst_caps_unref (new_caps);
return ret;
}
static gboolean
decode_pad_set_target (GstGhostPad * pad, GstPad * target)
{
gboolean res = gst_ghost_pad_set_target (pad, target);
if (!res)
return res;
/* Check if existing config is reusable as-is by checking if
* the existing decoder accepts the new caps, if not delete
* it and create a new one */
if (output->decoder) {
gboolean can_reuse_decoder;
if (target == NULL)
gst_pad_sticky_events_foreach (GST_PAD_CAST (pad), clear_sticky_events,
NULL);
else
gst_pad_sticky_events_foreach (target, copy_sticky_events, pad);
if (needs_decoder) {
can_reuse_decoder =
gst_pad_query_accept_caps (output->decoder_sink, new_caps);
} else
can_reuse_decoder = FALSE;
return res;
}
if (can_reuse_decoder) {
if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
output->drop_probe_id =
gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
}
GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
if (output->linked == FALSE) {
gst_pad_link_full (slot->src_pad, output->decoder_sink,
GST_PAD_LINK_CHECK_NOTHING);
output->linked = TRUE;
}
gst_caps_unref (new_caps);
return ret;
}
static void
db_output_stream_expose_src_pad (DecodebinOutputStream * output)
{
MultiQueueSlot *slot = output->slot;
GstEvent *stream_start;
GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
if (output->src_exposed)
return;
if (output->linked)
gst_pad_unlink (slot->src_pad, output->decoder_sink);
output->linked = FALSE;
if (output->drop_probe_id) {
gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
output->drop_probe_id = 0;
}
stream_start =
gst_pad_get_sticky_event (slot->src_pad, GST_EVENT_STREAM_START, 0);
if (!decode_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
gst_caps_unref (new_caps);
goto cleanup;
}
gst_element_set_locked_state (output->decoder, TRUE);
gst_element_set_state (output->decoder, GST_STATE_NULL);
gst_bin_remove ((GstBin *) dbin, output->decoder);
output->decoder = NULL;
output->decoder_latency = GST_CLOCK_TIME_NONE;
} else if (output->linked) {
/* Otherwise if we have no decoder yet but the output is linked make
* sure that the ghost pad is really unlinked in case no decoder was
* needed previously */
if (!decode_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
GST_ERROR_OBJECT (dbin, "Could not release ghost pad");
gst_caps_unref (new_caps);
goto cleanup;
}
}
gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
gst_object_replace ((GstObject **) & output->decoder_src, NULL);
/* If a decoder is required, create one */
if (needs_decoder) {
GList *factories, *next_factory;
factories = next_factory = create_decoder_factory_list (dbin, new_caps);
if (!next_factory) {
GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT,
new_caps);
g_assert (output->decoder == NULL);
ret = FALSE;
goto missing_decoder;
}
while (next_factory) {
gboolean decoder_failed = FALSE;
CandidateDecoder *candidate = NULL;
/* If we don't have a decoder yet, instantiate one */
output->decoder = gst_element_factory_create ((GstElementFactory *)
next_factory->data, NULL);
GST_DEBUG ("Trying decoder %" GST_PTR_FORMAT, output->decoder);
if (output->decoder == NULL)
goto try_next;
if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
GST_WARNING_OBJECT (dbin, "could not add decoder '%s' to pipeline",
GST_ELEMENT_NAME (output->decoder));
goto try_next;
}
output->decoder_sink =
gst_element_get_static_pad (output->decoder, "sink");
output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
if (output->type & GST_STREAM_TYPE_VIDEO) {
GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
output->drop_probe_id =
gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
}
candidate = add_candidate_decoder (dbin, output->decoder);
if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
GST_WARNING_OBJECT (dbin, "could not link to %s:%s",
GST_DEBUG_PAD_NAME (output->decoder_sink));
decoder_failed = TRUE;
goto try_next;
}
if (gst_element_set_state (output->decoder,
GST_STATE_READY) == GST_STATE_CHANGE_FAILURE) {
GST_WARNING_OBJECT (dbin,
"Decoder '%s' failed to reach READY state",
GST_ELEMENT_NAME (output->decoder));
decoder_failed = TRUE;
goto try_next;
}
if (!gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
GST_DEBUG_OBJECT (dbin,
"Decoder '%s' did not accept the caps, trying the next type",
GST_ELEMENT_NAME (output->decoder));
decoder_failed = TRUE;
goto try_next;
}
if (gst_element_set_state (output->decoder,
GST_STATE_PAUSED) == GST_STATE_CHANGE_FAILURE) {
GST_WARNING_OBJECT (dbin, "Decoder '%s' failed to reach PAUSED state",
GST_ELEMENT_NAME (output->decoder));
decoder_failed = TRUE;
goto try_next;
} else {
/* Everything went well */
output->linked = TRUE;
GST_DEBUG ("created decoder %" GST_PTR_FORMAT, output->decoder);
handle_stored_latency_message (dbin, output, candidate);
remove_candidate_decoder (dbin, candidate);
}
break;
try_next:
{
if (decoder_failed)
remove_decoder_link (output, slot);
if (candidate)
remove_candidate_decoder (dbin, candidate);
if (!next_factory->next) {
ret = FALSE;
if (!decoder_failed)
goto cleanup;
if (output->decoder == NULL)
goto missing_decoder;
} else {
next_factory = next_factory->next;
}
}
}
gst_plugin_feature_list_free (factories);
/* Ensure GstStream is accesiable from pad-added callback */
if (stream_start) {
gst_pad_store_sticky_event (output->src_pad, stream_start);
gst_event_unref (stream_start);
} else {
GST_WARNING_OBJECT (slot->src_pad, "Pad has no stored stream-start event");
}
output->src_exposed = TRUE;
gst_element_add_pad (GST_ELEMENT_CAST (slot->dbin), output->src_pad);
}
static CandidateDecoder *
add_candidate_decoder (GstDecodebin3 * dbin, GstElement * element)
{
GST_OBJECT_LOCK (dbin);
CandidateDecoder *candidate;
candidate = g_new0 (CandidateDecoder, 1);
candidate->element = element;
dbin->candidate_decoders =
g_list_prepend (dbin->candidate_decoders, candidate);
GST_OBJECT_UNLOCK (dbin);
return candidate;
}
static void
remove_candidate_decoder (GstDecodebin3 * dbin, CandidateDecoder * candidate)
{
GST_OBJECT_LOCK (dbin);
dbin->candidate_decoders =
g_list_remove (dbin->candidate_decoders, candidate);
if (candidate->error)
gst_message_unref (candidate->error);
g_free (candidate);
GST_OBJECT_UNLOCK (dbin);
}
/** db_output_stream_setup_decoder:
* @output: A #DecodebinOutputStream
* @caps: (transfer none): The #GstCaps for which we want a decoder
* @msg: A pointer to a #GstMessage
*
* Finds the appropriate decoder for @caps and sets it up. If the @caps match
* the decodebin output caps, it will be configured to propagate the stream
* as-is without any decoder.
*
* Returns: #TRUE if a decoder was found and properly setup, else #FALSE. If the
* failure was due to missing plugins, then @msg will be properly filled up.
*/
static gboolean
db_output_stream_setup_decoder (DecodebinOutputStream * output,
GstCaps * new_caps, GstMessage ** msg)
{
gboolean ret = TRUE;
GstDecodebin3 *dbin = output->dbin;
MultiQueueSlot *slot = output->slot;
GList *factories, *next_factory;
GST_DEBUG_OBJECT (dbin, "output %s:%s caps %" GST_PTR_FORMAT,
GST_DEBUG_PAD_NAME (output->src_pad), new_caps);
/* If no decoder is required, use the slot source pad and we're done */
if (gst_caps_can_intersect (new_caps, dbin->caps)) {
output->decoder_src = gst_object_ref (slot->src_pad);
output->decoder_sink = NULL;
goto done;
}
gst_caps_unref (new_caps);
if (!decode_pad_set_target ((GstGhostPad *) output->src_pad,
output->decoder_src)) {
GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
factories = next_factory = create_decoder_factory_list (dbin, new_caps);
if (!next_factory) {
GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT, new_caps);
g_assert (output->decoder == NULL);
ret = FALSE;
goto cleanup;
goto missing_decoder;
}
output->linked = TRUE;
while (next_factory) {
CandidateDecoder *candidate = NULL;
if (output->src_exposed == FALSE) {
GstEvent *stream_start;
/* If we don't have a decoder yet, instantiate one */
output->decoder = gst_element_factory_create (
(GstElementFactory *) next_factory->data, NULL);
GST_DEBUG ("Trying decoder %" GST_PTR_FORMAT, output->decoder);
stream_start = gst_pad_get_sticky_event (slot->src_pad,
GST_EVENT_STREAM_START, 0);
if (output->decoder == NULL)
goto try_next;
/* Ensure GstStream is accesiable from pad-added callback */
if (stream_start) {
gst_pad_store_sticky_event (output->src_pad, stream_start);
gst_event_unref (stream_start);
} else {
GST_WARNING_OBJECT (slot->src_pad,
"Pad has no stored stream-start event");
if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
GST_WARNING_OBJECT (dbin, "could not add decoder '%s' to pipeline",
GST_ELEMENT_NAME (output->decoder));
goto try_next;
}
output->decoder_sink = gst_element_get_static_pad (output->decoder, "sink");
output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
candidate = add_candidate_decoder (dbin, output->decoder);
if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
GST_WARNING_OBJECT (dbin, "could not link to %s:%s",
GST_DEBUG_PAD_NAME (output->decoder_sink));
goto try_next;
}
output->linked = TRUE;
if (gst_element_set_state (output->decoder, GST_STATE_READY) ==
GST_STATE_CHANGE_FAILURE) {
GST_WARNING_OBJECT (dbin, "Decoder '%s' failed to reach READY state",
GST_ELEMENT_NAME (output->decoder));
goto try_next;
}
output->src_exposed = TRUE;
gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
if (!gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
GST_DEBUG_OBJECT (dbin,
"Decoder '%s' did not accept the caps, trying the next type",
GST_ELEMENT_NAME (output->decoder));
goto try_next;
}
if (gst_element_set_state (output->decoder, GST_STATE_PAUSED) ==
GST_STATE_CHANGE_FAILURE) {
GST_WARNING_OBJECT (dbin, "Decoder '%s' failed to reach PAUSED state",
GST_ELEMENT_NAME (output->decoder));
goto try_next;
}
/* Everything went well, we have a decoder */
GST_DEBUG ("created decoder %" GST_PTR_FORMAT, output->decoder);
handle_stored_latency_message (dbin, output, candidate);
remove_candidate_decoder (dbin, candidate);
break;
try_next:{
db_output_stream_reset (output);
if (candidate)
remove_candidate_decoder (dbin, candidate);
if (!next_factory->next) {
ret = FALSE;
if (output->decoder == NULL)
goto missing_decoder;
goto cleanup;
}
next_factory = next_factory->next;
}
}
gst_plugin_feature_list_free (factories);
done:
if (output->type & GST_STREAM_TYPE_VIDEO) {
GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
output->drop_probe_id =
gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
}
/* Set the decode pad target */
decode_pad_set_target ((GstGhostPad *) output->src_pad, output->decoder_src);
/* Expose the source pad if needed */
db_output_stream_expose_src_pad (output);
if (output->decoder)
gst_element_sync_state_with_parent (output->decoder);
output->slot = slot;
return ret;
missing_decoder:
@ -3858,6 +3772,8 @@ missing_decoder:
"We are missing a decoder for %" GST_PTR_FORMAT, caps);
*msg = gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps);
gst_caps_unref (caps);
/* FALLTHROUGH */
}
cleanup:
@ -3880,12 +3796,72 @@ cleanup:
}
}
/** db_output_stream_reconfigure:
* @output: A #DecodebinOutputStream
* @msg: A pointer to a #GstMessage
*
* (Re)Configure the @output for the associated slot active stream.
*
* Returns: #TRUE if the output was properly (re)configured. #FALSE if it
* failed, in which case the stream shouldn't be used and the @msg might contain
* a message to be posted on the bus.
*/
static gboolean
db_output_stream_reconfigure (DecodebinOutputStream * output, GstMessage ** msg)
{
MultiQueueSlot *slot = output->slot;
GstDecodebin3 *dbin = output->dbin;
GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
gboolean needs_decoder;
gboolean ret = TRUE;
needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
GST_DEBUG_OBJECT (dbin,
"Reconfiguring output %s:%s to slot %s:%s, needs_decoder:%d",
GST_DEBUG_PAD_NAME (output->src_pad), GST_DEBUG_PAD_NAME (slot->src_pad),
needs_decoder);
/* First check if we can re-use the output as-is for the new caps:
* * Either we have a decoder and it can accept the new caps
* * Or we don't have one and don't need one
*/
/* If we need a decoder and the existing one can accept the new caps, re-use it */
if (needs_decoder && output->decoder &&
gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
GST_DEBUG_OBJECT (dbin,
"Reusing existing decoder '%" GST_PTR_FORMAT "' for slot %p",
output->decoder, slot);
/* Re-add the keyframe-waiter probe */
if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
output->drop_probe_id =
gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
}
if (output->linked == FALSE) {
gst_pad_link_full (slot->src_pad, output->decoder_sink,
GST_PAD_LINK_CHECK_NOTHING);
output->linked = TRUE;
}
} else {
/* We need to reset the output and set it up again */
db_output_stream_reset (output);
/* Setup the decoder */
ret = db_output_stream_setup_decoder (output, new_caps, msg);
}
gst_caps_unref (new_caps);
return ret;
}
static GstPadProbeReturn
idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
{
GstDecodebin3 *dbin = slot->dbin;
check_slot_reconfiguration (dbin, slot);
mq_slot_check_reconfiguration (slot);
return GST_PAD_PROBE_REMOVE;
}