From f091d2445f61766c01c7a431e2d0a12ff3127903 Mon Sep 17 00:00:00 2001 From: Edward Hervey Date: Sat, 5 Nov 2022 09:16:41 +0100 Subject: [PATCH] decodebin3: Refactor parsebin output handling * Instead of creating temporary `PendingPad` structures, always create a DecodebinInputStream for every pad of parsebin * Remove never used `pending_stream` field from DecodebinInputStream * When unblocking a given DecodebinInput (i.e. wrapping a parsebin), also make sure that other parsebins from the same GstStreamCollection are unblocked since they come from the same source Part-of: --- .../gst/playback/gstdecodebin3-parse.c | 320 +++++++----------- .../gst/playback/gstdecodebin3.c | 34 +- 2 files changed, 120 insertions(+), 234 deletions(-) diff --git a/subprojects/gst-plugins-base/gst/playback/gstdecodebin3-parse.c b/subprojects/gst-plugins-base/gst/playback/gstdecodebin3-parse.c index 6675aae9d5..b4007e5ad9 100644 --- a/subprojects/gst-plugins-base/gst/playback/gstdecodebin3-parse.c +++ b/subprojects/gst-plugins-base/gst/playback/gstdecodebin3-parse.c @@ -49,49 +49,36 @@ _custom_eos_quark_get (void) return g_quark; } -/* Streams that come from demuxers (input/upstream) */ +/* Streams that come from parsebin */ /* FIXME : All this is hardcoded. Switch to tree of chains */ struct _DecodebinInputStream { GstDecodebin3 *dbin; - GstStream *pending_stream; /* Extra ref */ + GstStream *active_stream; DecodebinInput *input; - GstPad *srcpad; /* From demuxer */ + GstPad *srcpad; /* From parsebin */ /* id of the pad event probe */ gulong output_event_probe_id; - /* id of the buffer blocking probe on the input (demuxer src) pad */ - gulong input_buffer_probe_id; + /* id of the buffer blocking probe on the parsebin srcpad pad */ + gulong buffer_probe_id; /* Whether we saw an EOS on input. This should be treated accordingly * when the stream is no longer used */ gboolean saw_eos; }; +static void unblock_pending_input (DecodebinInput * input, + gboolean unblock_other_inputs); static void parsebin_pad_added_cb (GstElement * demux, GstPad * pad, DecodebinInput * input); static void parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, DecodebinInput * input); -/* WITH SELECTION_LOCK TAKEN! */ -static gboolean -pending_pads_are_eos (DecodebinInput * input) -{ - GList *tmp; - - for (tmp = input->pending_pads; tmp; tmp = tmp->next) { - PendingPad *ppad = (PendingPad *) tmp->data; - if (ppad->saw_eos == FALSE) - return FALSE; - } - - return TRUE; -} - /* WITH SELECTION_LOCK TAKEN! */ static gboolean all_inputs_are_eos (GstDecodebin3 * dbin) @@ -104,13 +91,6 @@ all_inputs_are_eos (GstDecodebin3 * dbin) return FALSE; } - /* Check pending pads */ - if (!pending_pads_are_eos (dbin->main_input)) - return FALSE; - for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) - if (!pending_pads_are_eos ((DecodebinInput *) tmp->data)) - return FALSE; - GST_DEBUG_OBJECT (dbin, "All streams are EOS"); return TRUE; } @@ -288,6 +268,15 @@ parse_chain_output_probe (GstPad * pad, GstPadProbeInfo * info, } } break; + case GST_EVENT_GAP: + { + /* If we are still waiting to be unblocked and we get a gap, unblock */ + if (input->buffer_probe_id) { + GST_DEBUG_OBJECT (pad, "Got a gap event! Unblocking input(s) !"); + unblock_pending_input (input->input, TRUE); + } + break; + } case GST_EVENT_CAPS: { GstCaps *caps = NULL; @@ -366,18 +355,26 @@ parse_chain_output_probe (GstPad * pad, GstPadProbeInfo * info, return ret; } -static DecodebinInputStream * -create_input_stream (GstDecodebin3 * dbin, GstStream * stream, GstPad * pad, +static GstPadProbeReturn +parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info, DecodebinInput * input) +{ + /* We have at least one buffer pending, unblock parsebin(s) */ + GST_DEBUG_OBJECT (pad, "Got a buffer ! unblocking"); + unblock_pending_input (input, TRUE); + + return GST_PAD_PROBE_OK; +} + +static DecodebinInputStream * +create_input_stream (GstDecodebin3 * dbin, GstPad * pad, DecodebinInput * input) { DecodebinInputStream *res = g_new0 (DecodebinInputStream, 1); - GST_DEBUG_OBJECT (pad, "Creating input stream for stream %p %s (input:%p)", - stream, gst_stream_get_stream_id (stream), input); + GST_DEBUG_OBJECT (dbin, "Creating input stream for %" GST_PTR_FORMAT, pad); res->dbin = dbin; res->input = input; - res->pending_stream = gst_object_ref (stream); res->srcpad = pad; /* Put probe on output source pad (for detecting EOS/STREAM_START/FLUSH) */ @@ -387,6 +384,12 @@ create_input_stream (GstDecodebin3 * dbin, GstStream * stream, GstPad * pad, | GST_PAD_PROBE_TYPE_EVENT_FLUSH, (GstPadProbeCallback) parse_chain_output_probe, res, NULL); + /* Install a blocking buffer probe */ + res->buffer_probe_id = + gst_pad_add_probe (pad, + GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) parsebin_buffer_probe, input, NULL); + /* Add to list of current input streams */ SELECTION_LOCK (dbin); dbin->input_streams = g_list_append (dbin->input_streams, res); @@ -406,6 +409,8 @@ remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream) stream->active_stream ? gst_stream_get_stream_id (stream->active_stream) : ""); + gst_object_replace ((GstObject **) & stream->active_stream, NULL); + /* Unlink from slot */ if (stream->srcpad) { GstPad *peer; @@ -414,6 +419,8 @@ remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream) gst_pad_unlink (stream->srcpad, peer); gst_object_unref (peer); } + if (stream->buffer_probe_id) + gst_pad_remove_probe (stream->srcpad, stream->buffer_probe_id); } slot = get_slot_for_input (dbin, stream); @@ -423,32 +430,31 @@ remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream) GST_DEBUG_OBJECT (dbin, "slot %p cleared", slot); } - if (stream->active_stream) - gst_object_unref (stream->active_stream); - if (stream->pending_stream) - gst_object_unref (stream->pending_stream); - dbin->input_streams = g_list_remove (dbin->input_streams, stream); g_free (stream); } static void -unblock_pending_input (DecodebinInput * input) +unblock_pending_input (DecodebinInput * input, gboolean unblock_other_inputs) { GstDecodebin3 *dbin = input->dbin; GList *tmp, *unused_slot = NULL; - /* 1. Re-use existing streams if/when possible */ + GST_DEBUG_OBJECT (dbin, + "DecodebinInput for %" GST_PTR_FORMAT " , unblock_other_inputs:%d", + input->parsebin, unblock_other_inputs); + + /* Re-use existing streams if/when possible */ GST_FIXME_OBJECT (dbin, "Re-use existing input streams if/when possible"); - /* 2. Remove unused streams (push EOS) */ - GST_DEBUG_OBJECT (dbin, "Removing unused streams"); + /* Unblock all input streams and link to a slot if needed */ SELECTION_LOCK (dbin); tmp = dbin->input_streams; while (tmp != NULL) { DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data; GList *next = tmp->next; + MultiQueueSlot *slot; if (input_stream->input != input) { tmp = next; @@ -456,58 +462,33 @@ unblock_pending_input (DecodebinInput * input) } GST_DEBUG_OBJECT (dbin, "Checking input stream %p", input_stream); - if (input_stream->input_buffer_probe_id) { + + if (!input_stream->active_stream) + input_stream->active_stream = gst_pad_get_stream (input_stream->srcpad); + + /* Ensure the stream has an associated slot */ + slot = get_slot_for_input (dbin, input_stream); + if (slot->input != input_stream) + link_input_to_slot (input_stream, slot); + + if (input_stream->buffer_probe_id) { GST_DEBUG_OBJECT (dbin, "Removing pad block on input %p pad %" GST_PTR_FORMAT, input_stream, input_stream->srcpad); gst_pad_remove_probe (input_stream->srcpad, - input_stream->input_buffer_probe_id); + input_stream->buffer_probe_id); + input_stream->buffer_probe_id = 0; } - input_stream->input_buffer_probe_id = 0; if (input_stream->saw_eos) { + GST_DEBUG_OBJECT (dbin, "Removing EOS'd stream"); remove_input_stream (dbin, input_stream); tmp = dbin->input_streams; } else tmp = next; } - SELECTION_UNLOCK (dbin); - - GST_DEBUG_OBJECT (dbin, "Creating new streams (if needed)"); - /* 3. Create new streams */ - for (tmp = input->pending_pads; tmp; tmp = tmp->next) { - GstStream *stream; - PendingPad *ppad = (PendingPad *) tmp->data; - - stream = gst_pad_get_stream (ppad->pad); - if (stream == NULL) { - GST_ERROR_OBJECT (dbin, "No stream for pad ????"); - } else { - MultiQueueSlot *slot; - DecodebinInputStream *input_stream; - /* The remaining pads in pending_pads are the ones that require a new - * input stream */ - input_stream = create_input_stream (dbin, stream, ppad->pad, ppad->input); - /* See if we can link it straight away */ - input_stream->active_stream = stream; - - SELECTION_LOCK (dbin); - slot = get_slot_for_input (dbin, input_stream); - link_input_to_slot (input_stream, slot); - SELECTION_UNLOCK (dbin); - - /* Remove the buffer and event probe */ - gst_pad_remove_probe (ppad->pad, ppad->buffer_probe); - gst_pad_remove_probe (ppad->pad, ppad->event_probe); - g_free (ppad); - } - } - - g_list_free (input->pending_pads); - input->pending_pads = NULL; /* Weed out unused multiqueue slots */ - SELECTION_LOCK (dbin); for (tmp = dbin->slots; tmp; tmp = tmp->next) { MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data; GST_LOG_OBJECT (dbin, "Slot %d input:%p", slot->id, slot->input); @@ -518,105 +499,43 @@ unblock_pending_input (DecodebinInput * input) } SELECTION_UNLOCK (dbin); - for (tmp = unused_slot; tmp; tmp = tmp->next) { - GstPad *sink_pad = (GstPad *) tmp->data; - GST_DEBUG_OBJECT (sink_pad, "Sending EOS to unused slot"); - gst_pad_send_event (sink_pad, gst_event_new_eos ()); - } - - if (unused_slot) - g_list_free_full (unused_slot, (GDestroyNotify) gst_object_unref); - -} - -/* FIXME : HACK, REMOVE, USE INPUT CHAINS */ -static GstPadProbeReturn -parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info, - DecodebinInput * input) -{ - /* Any data out the demuxer means it's not creating pads - * any more right now */ - GST_DEBUG_OBJECT (pad, "Got a buffer ! UNBLOCK !"); - unblock_pending_input (input); - - return GST_PAD_PROBE_OK; -} - -static GstPadProbeReturn -parsebin_pending_event_probe (GstPad * pad, GstPadProbeInfo * info, - PendingPad * ppad) -{ - GstDecodebin3 *dbin = ppad->dbin; - /* We drop all events by default */ - GstPadProbeReturn ret = GST_PAD_PROBE_DROP; - GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info); - - GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev)); - switch (GST_EVENT_TYPE (ev)) { - case GST_EVENT_EOS: - { - GST_DEBUG_OBJECT (pad, "Pending pad marked as EOS, removing"); - ppad->input->pending_pads = - g_list_remove (ppad->input->pending_pads, ppad); - gst_pad_remove_probe (ppad->pad, ppad->buffer_probe); - gst_pad_remove_probe (ppad->pad, ppad->event_probe); - g_free (ppad); - - SELECTION_LOCK (dbin); - check_all_streams_for_eos (dbin, ev); - SELECTION_UNLOCK (dbin); + if (unused_slot) { + for (tmp = unused_slot; tmp; tmp = tmp->next) { + GstPad *sink_pad = (GstPad *) tmp->data; + GST_DEBUG_OBJECT (sink_pad, "Sending EOS to unused slot"); + gst_pad_send_event (sink_pad, gst_event_new_eos ()); } - break; - case GST_EVENT_GAP: - GST_DEBUG_OBJECT (pad, "Got a gap event! UNBLOCK !"); - unblock_pending_input (ppad->input); - ret = GST_PAD_PROBE_OK; - break; - default: - break; + g_list_free_full (unused_slot, (GDestroyNotify) gst_object_unref); } - return ret; + if (unblock_other_inputs) { + GList *tmp; + /* If requrested, unblock inputs which are targetting the same collection */ + if (dbin->main_input != input) { + if (dbin->main_input->collection == input->collection) { + GST_DEBUG_OBJECT (dbin, "Unblock main input"); + unblock_pending_input (dbin->main_input, FALSE); + } + } + for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) { + DecodebinInput *other = tmp->data; + if (other->collection == input->collection) { + GST_DEBUG_OBJECT (dbin, "Unblock other input"); + unblock_pending_input (other, FALSE); + } + } + } } static void parsebin_pad_added_cb (GstElement * demux, GstPad * pad, DecodebinInput * input) { GstDecodebin3 *dbin = input->dbin; - PendingPad *ppad; - GList *tmp; GST_DEBUG_OBJECT (dbin, "New pad %s:%s (input:%p)", GST_DEBUG_PAD_NAME (pad), input); - ppad = g_new0 (PendingPad, 1); - ppad->dbin = dbin; - ppad->input = input; - ppad->pad = pad; - - ppad->event_probe = - gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, - (GstPadProbeCallback) parsebin_pending_event_probe, ppad, NULL); - ppad->buffer_probe = - gst_pad_add_probe (pad, - GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER, - (GstPadProbeCallback) parsebin_buffer_probe, input, NULL); - - input->pending_pads = g_list_append (input->pending_pads, ppad); - - /* Check if all existing input streams have a buffer probe set */ - for (tmp = dbin->input_streams; tmp; tmp = tmp->next) { - DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data; - if (input_stream->input == input && - input_stream->input_buffer_probe_id == 0) { - GST_DEBUG_OBJECT (input_stream->srcpad, "Adding blocking buffer probe"); - input_stream->input_buffer_probe_id = - gst_pad_add_probe (input_stream->srcpad, - GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER, - (GstPadProbeCallback) parsebin_buffer_probe, input_stream->input, - NULL); - } - } + create_input_stream (dbin, pad, input); } static void @@ -624,50 +543,45 @@ parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, DecodebinInput * inp) { GstDecodebin3 *dbin = inp->dbin; DecodebinInputStream *input = NULL; + MultiQueueSlot *slot; GList *tmp; + GST_DEBUG_OBJECT (pad, "removed"); for (tmp = dbin->input_streams; tmp; tmp = tmp->next) { DecodebinInputStream *cand = (DecodebinInputStream *) tmp->data; - if (cand->srcpad == pad) + if (cand->srcpad == pad) { input = cand; - } - /* If there are no pending pads, this means we will definitely not need this - * stream anymore */ - if (input) { - GST_DEBUG_OBJECT (pad, "stream %p", input); - if (inp->pending_pads == NULL) { - MultiQueueSlot *slot; - - GST_DEBUG_OBJECT (pad, "Remove input stream %p", input); - - SELECTION_LOCK (dbin); - slot = get_slot_for_input (dbin, input); - - remove_input_stream (dbin, input); - if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) { - /* if slot is still there and already drained, remove it in here */ - if (slot->output) { - DecodebinOutputStream *output = slot->output; - GST_DEBUG_OBJECT (pad, - "Multiqueue was drained, Remove output stream"); - - dbin->output_streams = g_list_remove (dbin->output_streams, output); - free_output_stream (dbin, output); - } - GST_DEBUG_OBJECT (pad, "No pending pad, Remove multiqueue slot"); - if (slot->probe_id) - gst_pad_remove_probe (slot->src_pad, slot->probe_id); - slot->probe_id = 0; - dbin->slots = g_list_remove (dbin->slots, slot); - free_multiqueue_slot_async (dbin, slot); - } - SELECTION_UNLOCK (dbin); - } else { - input->srcpad = NULL; - if (input->input_buffer_probe_id) - gst_pad_remove_probe (pad, input->input_buffer_probe_id); - input->input_buffer_probe_id = 0; + break; } } + g_assert (input); + + /* If there are no pending pads, this means we will definitely not need this + * stream anymore */ + + GST_DEBUG_OBJECT (pad, "Remove input stream %p", input); + + SELECTION_LOCK (dbin); + slot = get_slot_for_input (dbin, input); + input->srcpad = NULL; + remove_input_stream (dbin, input); + + if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) { + /* if slot is still there and already drained, remove it in here */ + if (slot->output) { + DecodebinOutputStream *output = slot->output; + GST_DEBUG_OBJECT (pad, "Multiqueue was drained, Remove output stream"); + + dbin->output_streams = g_list_remove (dbin->output_streams, output); + free_output_stream (dbin, output); + } + GST_DEBUG_OBJECT (pad, "No pending pad, Remove multiqueue slot"); + if (slot->probe_id) + gst_pad_remove_probe (slot->src_pad, slot->probe_id); + slot->probe_id = 0; + dbin->slots = g_list_remove (dbin->slots, slot); + free_multiqueue_slot_async (dbin, slot); + } + SELECTION_UNLOCK (dbin); } diff --git a/subprojects/gst-plugins-base/gst/playback/gstdecodebin3.c b/subprojects/gst-plugins-base/gst/playback/gstdecodebin3.c index 0a1437d181..274b84cce7 100644 --- a/subprojects/gst-plugins-base/gst/playback/gstdecodebin3.c +++ b/subprojects/gst-plugins-base/gst/playback/gstdecodebin3.c @@ -303,10 +303,6 @@ struct _DecodebinInput * FIXME : When do we reset it if re-used ? */ gboolean drained; - - /* HACK : Remove these fields */ - /* List of PendingPad structures */ - GList *pending_pads; }; /* Multiqueue Slots */ @@ -362,18 +358,6 @@ struct _DecodebinOutputStream gulong drop_probe_id; }; -/* Pending pads from parsebin */ -typedef struct _PendingPad -{ - GstDecodebin3 *dbin; - DecodebinInput *input; - GstPad *pad; - - gulong buffer_probe; - gulong event_probe; - gboolean saw_eos; -} PendingPad; - /* properties */ enum { @@ -1935,21 +1919,9 @@ check_all_slot_for_eos (GstDecodebin3 * dbin, GstEvent * ev) break; } - if (all_drained) { - INPUT_LOCK (dbin); - if (!pending_pads_are_eos (dbin->main_input)) - all_drained = FALSE; - - if (all_drained) { - for (iter = dbin->other_inputs; iter; iter = iter->next) { - if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) { - all_drained = FALSE; - break; - } - } - } - INPUT_UNLOCK (dbin); - } + /* Also check with the inputs, data might be pending */ + if (all_drained) + all_drained = all_inputs_are_eos (dbin); if (all_drained) { GST_DEBUG_OBJECT (dbin,