decodebin3: Refactor removal of slot/output from streaming thread

The code was identical in several places

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7002>
This commit is contained in:
Edward Hervey 2024-03-21 12:17:25 +01:00 committed by Backport Bot
parent b6263febe0
commit 13407a11d6

View file

@ -560,8 +560,6 @@ static MultiQueueSlot
static void link_input_to_slot (DecodebinInputStream * input,
MultiQueueSlot * slot);
static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
MultiQueueSlot * slot);
static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
static void update_requested_selection (GstDecodebin3 * dbin);
@ -1357,6 +1355,75 @@ find_input_stream_for_pad (GstDecodebin3 * dbin, GstPad * pad)
return NULL;
}
/* Must be called with the selection lock taken */
static void
gst_decodebin3_update_min_interleave (GstDecodebin3 * dbin)
{
GstClockTime max_latency = GST_CLOCK_TIME_NONE;
GList *tmp;
GST_DEBUG_OBJECT (dbin, "Recalculating max latency of decoders");
for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
if (GST_CLOCK_TIME_IS_VALID (out->decoder_latency)) {
if (max_latency == GST_CLOCK_TIME_NONE
|| out->decoder_latency > max_latency)
max_latency = out->decoder_latency;
}
}
GST_DEBUG_OBJECT (dbin, "max latency of all decoders: %" GST_TIME_FORMAT,
GST_TIME_ARGS (max_latency));
if (!GST_CLOCK_TIME_IS_VALID (max_latency))
return;
/* Make sure we keep an extra overhead */
max_latency += 100 * GST_MSECOND;
if (max_latency == dbin->current_mq_min_interleave)
return;
dbin->current_mq_min_interleave = max_latency;
GST_DEBUG_OBJECT (dbin, "Setting mq min-interleave to %" GST_TIME_FORMAT,
GST_TIME_ARGS (dbin->current_mq_min_interleave));
g_object_set (dbin->multiqueue, "min-interleave-time",
dbin->current_mq_min_interleave, NULL);
}
/** remove_slot_from_streaming_thread:
* @dbin: A #GstDecodebin3
* @slot: The #MultiQueueslot to remove
*
* Remove a #MultiQueueslot and associated output. Call this when done from a
* multiqueue streaming thread.
*
* Must be called with the SELECTION_LOCK taken.
*/
static void
remove_slot_from_streaming_thread (GstDecodebin3 * dbin, MultiQueueSlot * slot)
{
/* if slot is still there and already drained, remove it in here */
if (slot->output) {
DecodebinOutputStream *output = slot->output;
GST_DEBUG_OBJECT (slot->src_pad,
"Multiqueue slot is drained, Remove output stream");
dbin->output_streams = g_list_remove (dbin->output_streams, output);
free_output_stream (dbin, output);
}
GST_DEBUG_OBJECT (slot->src_pad, "No pending pad, Remove multiqueue slot");
if (slot->probe_id)
gst_pad_remove_probe (slot->src_pad, slot->probe_id);
slot->probe_id = 0;
dbin->slots = g_list_remove (dbin->slots, slot);
/* The minimum interleave might have changed, recalculate it */
gst_decodebin3_update_min_interleave (dbin);
gst_element_call_async (GST_ELEMENT_CAST (dbin),
(GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
}
static void
parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, DecodebinInput * inp)
{
@ -1387,22 +1454,9 @@ parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, DecodebinInput * inp)
slot = gst_decodebin_get_slot_for_input_stream_locked (dbin, input);
remove_input_stream (dbin, input);
if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) {
/* if slot is still there and already drained, remove it in here */
if (slot->output) {
DecodebinOutputStream *output = slot->output;
GST_DEBUG_OBJECT (pad, "Multiqueue was drained, Remove output stream");
if (slot && slot->is_drained)
remove_slot_from_streaming_thread (dbin, slot);
dbin->output_streams = g_list_remove (dbin->output_streams, output);
free_output_stream (dbin, output);
}
GST_DEBUG_OBJECT (pad, "No pending pad, Remove multiqueue slot");
if (slot->probe_id)
gst_pad_remove_probe (slot->src_pad, slot->probe_id);
slot->probe_id = 0;
dbin->slots = g_list_remove (dbin->slots, slot);
free_multiqueue_slot_async (dbin, slot);
}
SELECTION_UNLOCK (dbin);
}
@ -2702,40 +2756,6 @@ handle_stream_collection_locked (GstDecodebin3 * dbin,
return message;
}
/* Must be called with the selection lock taken */
static void
gst_decodebin3_update_min_interleave (GstDecodebin3 * dbin)
{
GstClockTime max_latency = GST_CLOCK_TIME_NONE;
GList *tmp;
GST_DEBUG_OBJECT (dbin, "Recalculating max latency of decoders");
for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
if (GST_CLOCK_TIME_IS_VALID (out->decoder_latency)) {
if (max_latency == GST_CLOCK_TIME_NONE
|| out->decoder_latency > max_latency)
max_latency = out->decoder_latency;
}
}
GST_DEBUG_OBJECT (dbin, "max latency of all decoders: %" GST_TIME_FORMAT,
GST_TIME_ARGS (max_latency));
if (!GST_CLOCK_TIME_IS_VALID (max_latency))
return;
/* Make sure we keep an extra overhead */
max_latency += 100 * GST_MSECOND;
if (max_latency == dbin->current_mq_min_interleave)
return;
dbin->current_mq_min_interleave = max_latency;
GST_DEBUG_OBJECT (dbin, "Setting mq min-interleave to %" GST_TIME_FORMAT,
GST_TIME_ARGS (dbin->current_mq_min_interleave));
g_object_set (dbin->multiqueue, "min-interleave-time",
dbin->current_mq_min_interleave, NULL);
}
static void
gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
{
@ -2855,6 +2875,7 @@ drop_message:
}
}
/* Called with SELECTION_LOCK taken */
static void
handle_stored_latency_message (GstDecodebin3 * dbin,
DecodebinOutputStream * output, CandidateDecoder * candidate)
@ -3244,19 +3265,8 @@ multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
SELECTION_LOCK (dbin);
if (slot->input == NULL) {
GST_DEBUG_OBJECT (pad,
"Got custom-eos from null input stream, remove output stream");
/* Remove the output */
if (slot->output) {
DecodebinOutputStream *output = slot->output;
dbin->output_streams =
g_list_remove (dbin->output_streams, output);
free_output_stream (dbin, output);
/* Reacalculate min interleave */
gst_decodebin3_update_min_interleave (dbin);
}
slot->probe_id = 0;
dbin->slots = g_list_remove (dbin->slots, slot);
free_multiqueue_slot_async (dbin, slot);
"Got custom-eos from null input stream, removing slot");
remove_slot_from_streaming_thread (dbin, slot);
ret = GST_PAD_PROBE_REMOVE;
} else if (!was_drained) {
check_and_drain_multiqueue_locked (dbin, ev);
@ -3278,24 +3288,14 @@ multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
gst_pad_send_event (peer, gst_event_ref (ev));
gst_object_unref (peer);
}
SELECTION_LOCK (dbin);
/* FIXME : Shouldn't we try to re-assign the output instead of just
* removing it ? */
/* Remove the output */
if (slot->output) {
DecodebinOutputStream *output = slot->output;
dbin->output_streams = g_list_remove (dbin->output_streams, output);
free_output_stream (dbin, output);
}
slot->probe_id = 0;
dbin->slots = g_list_remove (dbin->slots, slot);
SELECTION_UNLOCK (dbin);
SELECTION_LOCK (dbin);
/* FIXME: Removing the slot is async, which means actually
* unlinking the pad is async. Other things like stream-start
* might flow through this (now unprobed) link before it actually
* gets released */
free_multiqueue_slot_async (dbin, slot);
remove_slot_from_streaming_thread (dbin, slot);
SELECTION_UNLOCK (dbin);
ret = GST_PAD_PROBE_REMOVE;
} else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
CUSTOM_FINAL_EOS_QUARK)) {
@ -4282,14 +4282,6 @@ free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
g_free (slot);
}
static void
free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
{
GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
gst_element_call_async (GST_ELEMENT_CAST (dbin),
(GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
}
/* Create a DecodebinOutputStream for a given type
* Note: It will be empty initially, it needs to be configured
* afterwards */