decodebin3: Refactor parsebin output handling

* Instead of creating temporary `PendingPad` structures, always create a
  DecodebinInputStream for every pad of parsebin
* Remove never used `pending_stream` field from DecodebinInputStream
* When unblocking a given DecodebinInput (i.e. wrapping a parsebin), also make
  sure that other parsebins from the same GstStreamCollection are unblocked
  since they come from the same source

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2784>
This commit is contained in:
Edward Hervey 2022-11-05 09:16:41 +01:00 committed by GStreamer Marge Bot
parent d14c14dcc4
commit f091d2445f
2 changed files with 120 additions and 234 deletions

View file

@ -49,49 +49,36 @@ _custom_eos_quark_get (void)
return g_quark;
}
/* Streams that come from demuxers (input/upstream) */
/* Streams that come from parsebin */
/* FIXME : All this is hardcoded. Switch to tree of chains */
struct _DecodebinInputStream
{
GstDecodebin3 *dbin;
GstStream *pending_stream; /* Extra ref */
GstStream *active_stream;
DecodebinInput *input;
GstPad *srcpad; /* From demuxer */
GstPad *srcpad; /* From parsebin */
/* id of the pad event probe */
gulong output_event_probe_id;
/* id of the buffer blocking probe on the input (demuxer src) pad */
gulong input_buffer_probe_id;
/* id of the buffer blocking probe on the parsebin srcpad pad */
gulong buffer_probe_id;
/* Whether we saw an EOS on input. This should be treated accordingly
* when the stream is no longer used */
gboolean saw_eos;
};
static void unblock_pending_input (DecodebinInput * input,
gboolean unblock_other_inputs);
static void parsebin_pad_added_cb (GstElement * demux, GstPad * pad,
DecodebinInput * input);
static void parsebin_pad_removed_cb (GstElement * demux, GstPad * pad,
DecodebinInput * input);
/* WITH SELECTION_LOCK TAKEN! */
static gboolean
pending_pads_are_eos (DecodebinInput * input)
{
GList *tmp;
for (tmp = input->pending_pads; tmp; tmp = tmp->next) {
PendingPad *ppad = (PendingPad *) tmp->data;
if (ppad->saw_eos == FALSE)
return FALSE;
}
return TRUE;
}
/* WITH SELECTION_LOCK TAKEN! */
static gboolean
all_inputs_are_eos (GstDecodebin3 * dbin)
@ -104,13 +91,6 @@ all_inputs_are_eos (GstDecodebin3 * dbin)
return FALSE;
}
/* Check pending pads */
if (!pending_pads_are_eos (dbin->main_input))
return FALSE;
for (tmp = dbin->other_inputs; tmp; tmp = tmp->next)
if (!pending_pads_are_eos ((DecodebinInput *) tmp->data))
return FALSE;
GST_DEBUG_OBJECT (dbin, "All streams are EOS");
return TRUE;
}
@ -288,6 +268,15 @@ parse_chain_output_probe (GstPad * pad, GstPadProbeInfo * info,
}
}
break;
case GST_EVENT_GAP:
{
/* If we are still waiting to be unblocked and we get a gap, unblock */
if (input->buffer_probe_id) {
GST_DEBUG_OBJECT (pad, "Got a gap event! Unblocking input(s) !");
unblock_pending_input (input->input, TRUE);
}
break;
}
case GST_EVENT_CAPS:
{
GstCaps *caps = NULL;
@ -366,18 +355,26 @@ parse_chain_output_probe (GstPad * pad, GstPadProbeInfo * info,
return ret;
}
static DecodebinInputStream *
create_input_stream (GstDecodebin3 * dbin, GstStream * stream, GstPad * pad,
static GstPadProbeReturn
parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
DecodebinInput * input)
{
/* We have at least one buffer pending, unblock parsebin(s) */
GST_DEBUG_OBJECT (pad, "Got a buffer ! unblocking");
unblock_pending_input (input, TRUE);
return GST_PAD_PROBE_OK;
}
static DecodebinInputStream *
create_input_stream (GstDecodebin3 * dbin, GstPad * pad, DecodebinInput * input)
{
DecodebinInputStream *res = g_new0 (DecodebinInputStream, 1);
GST_DEBUG_OBJECT (pad, "Creating input stream for stream %p %s (input:%p)",
stream, gst_stream_get_stream_id (stream), input);
GST_DEBUG_OBJECT (dbin, "Creating input stream for %" GST_PTR_FORMAT, pad);
res->dbin = dbin;
res->input = input;
res->pending_stream = gst_object_ref (stream);
res->srcpad = pad;
/* Put probe on output source pad (for detecting EOS/STREAM_START/FLUSH) */
@ -387,6 +384,12 @@ create_input_stream (GstDecodebin3 * dbin, GstStream * stream, GstPad * pad,
| GST_PAD_PROBE_TYPE_EVENT_FLUSH,
(GstPadProbeCallback) parse_chain_output_probe, res, NULL);
/* Install a blocking buffer probe */
res->buffer_probe_id =
gst_pad_add_probe (pad,
GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) parsebin_buffer_probe, input, NULL);
/* Add to list of current input streams */
SELECTION_LOCK (dbin);
dbin->input_streams = g_list_append (dbin->input_streams, res);
@ -406,6 +409,8 @@ remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
stream->active_stream ? gst_stream_get_stream_id (stream->active_stream) :
"<NONE>");
gst_object_replace ((GstObject **) & stream->active_stream, NULL);
/* Unlink from slot */
if (stream->srcpad) {
GstPad *peer;
@ -414,6 +419,8 @@ remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
gst_pad_unlink (stream->srcpad, peer);
gst_object_unref (peer);
}
if (stream->buffer_probe_id)
gst_pad_remove_probe (stream->srcpad, stream->buffer_probe_id);
}
slot = get_slot_for_input (dbin, stream);
@ -423,32 +430,31 @@ remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
GST_DEBUG_OBJECT (dbin, "slot %p cleared", slot);
}
if (stream->active_stream)
gst_object_unref (stream->active_stream);
if (stream->pending_stream)
gst_object_unref (stream->pending_stream);
dbin->input_streams = g_list_remove (dbin->input_streams, stream);
g_free (stream);
}
static void
unblock_pending_input (DecodebinInput * input)
unblock_pending_input (DecodebinInput * input, gboolean unblock_other_inputs)
{
GstDecodebin3 *dbin = input->dbin;
GList *tmp, *unused_slot = NULL;
/* 1. Re-use existing streams if/when possible */
GST_DEBUG_OBJECT (dbin,
"DecodebinInput for %" GST_PTR_FORMAT " , unblock_other_inputs:%d",
input->parsebin, unblock_other_inputs);
/* Re-use existing streams if/when possible */
GST_FIXME_OBJECT (dbin, "Re-use existing input streams if/when possible");
/* 2. Remove unused streams (push EOS) */
GST_DEBUG_OBJECT (dbin, "Removing unused streams");
/* Unblock all input streams and link to a slot if needed */
SELECTION_LOCK (dbin);
tmp = dbin->input_streams;
while (tmp != NULL) {
DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
GList *next = tmp->next;
MultiQueueSlot *slot;
if (input_stream->input != input) {
tmp = next;
@ -456,58 +462,33 @@ unblock_pending_input (DecodebinInput * input)
}
GST_DEBUG_OBJECT (dbin, "Checking input stream %p", input_stream);
if (input_stream->input_buffer_probe_id) {
if (!input_stream->active_stream)
input_stream->active_stream = gst_pad_get_stream (input_stream->srcpad);
/* Ensure the stream has an associated slot */
slot = get_slot_for_input (dbin, input_stream);
if (slot->input != input_stream)
link_input_to_slot (input_stream, slot);
if (input_stream->buffer_probe_id) {
GST_DEBUG_OBJECT (dbin,
"Removing pad block on input %p pad %" GST_PTR_FORMAT, input_stream,
input_stream->srcpad);
gst_pad_remove_probe (input_stream->srcpad,
input_stream->input_buffer_probe_id);
input_stream->buffer_probe_id);
input_stream->buffer_probe_id = 0;
}
input_stream->input_buffer_probe_id = 0;
if (input_stream->saw_eos) {
GST_DEBUG_OBJECT (dbin, "Removing EOS'd stream");
remove_input_stream (dbin, input_stream);
tmp = dbin->input_streams;
} else
tmp = next;
}
SELECTION_UNLOCK (dbin);
GST_DEBUG_OBJECT (dbin, "Creating new streams (if needed)");
/* 3. Create new streams */
for (tmp = input->pending_pads; tmp; tmp = tmp->next) {
GstStream *stream;
PendingPad *ppad = (PendingPad *) tmp->data;
stream = gst_pad_get_stream (ppad->pad);
if (stream == NULL) {
GST_ERROR_OBJECT (dbin, "No stream for pad ????");
} else {
MultiQueueSlot *slot;
DecodebinInputStream *input_stream;
/* The remaining pads in pending_pads are the ones that require a new
* input stream */
input_stream = create_input_stream (dbin, stream, ppad->pad, ppad->input);
/* See if we can link it straight away */
input_stream->active_stream = stream;
SELECTION_LOCK (dbin);
slot = get_slot_for_input (dbin, input_stream);
link_input_to_slot (input_stream, slot);
SELECTION_UNLOCK (dbin);
/* Remove the buffer and event probe */
gst_pad_remove_probe (ppad->pad, ppad->buffer_probe);
gst_pad_remove_probe (ppad->pad, ppad->event_probe);
g_free (ppad);
}
}
g_list_free (input->pending_pads);
input->pending_pads = NULL;
/* Weed out unused multiqueue slots */
SELECTION_LOCK (dbin);
for (tmp = dbin->slots; tmp; tmp = tmp->next) {
MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
GST_LOG_OBJECT (dbin, "Slot %d input:%p", slot->id, slot->input);
@ -518,105 +499,43 @@ unblock_pending_input (DecodebinInput * input)
}
SELECTION_UNLOCK (dbin);
if (unused_slot) {
for (tmp = unused_slot; tmp; tmp = tmp->next) {
GstPad *sink_pad = (GstPad *) tmp->data;
GST_DEBUG_OBJECT (sink_pad, "Sending EOS to unused slot");
gst_pad_send_event (sink_pad, gst_event_new_eos ());
}
if (unused_slot)
g_list_free_full (unused_slot, (GDestroyNotify) gst_object_unref);
}
/* FIXME : HACK, REMOVE, USE INPUT CHAINS */
static GstPadProbeReturn
parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
DecodebinInput * input)
{
/* Any data out the demuxer means it's not creating pads
* any more right now */
GST_DEBUG_OBJECT (pad, "Got a buffer ! UNBLOCK !");
unblock_pending_input (input);
return GST_PAD_PROBE_OK;
if (unblock_other_inputs) {
GList *tmp;
/* If requrested, unblock inputs which are targetting the same collection */
if (dbin->main_input != input) {
if (dbin->main_input->collection == input->collection) {
GST_DEBUG_OBJECT (dbin, "Unblock main input");
unblock_pending_input (dbin->main_input, FALSE);
}
}
for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
DecodebinInput *other = tmp->data;
if (other->collection == input->collection) {
GST_DEBUG_OBJECT (dbin, "Unblock other input");
unblock_pending_input (other, FALSE);
}
static GstPadProbeReturn
parsebin_pending_event_probe (GstPad * pad, GstPadProbeInfo * info,
PendingPad * ppad)
{
GstDecodebin3 *dbin = ppad->dbin;
/* We drop all events by default */
GstPadProbeReturn ret = GST_PAD_PROBE_DROP;
GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
switch (GST_EVENT_TYPE (ev)) {
case GST_EVENT_EOS:
{
GST_DEBUG_OBJECT (pad, "Pending pad marked as EOS, removing");
ppad->input->pending_pads =
g_list_remove (ppad->input->pending_pads, ppad);
gst_pad_remove_probe (ppad->pad, ppad->buffer_probe);
gst_pad_remove_probe (ppad->pad, ppad->event_probe);
g_free (ppad);
SELECTION_LOCK (dbin);
check_all_streams_for_eos (dbin, ev);
SELECTION_UNLOCK (dbin);
}
break;
case GST_EVENT_GAP:
GST_DEBUG_OBJECT (pad, "Got a gap event! UNBLOCK !");
unblock_pending_input (ppad->input);
ret = GST_PAD_PROBE_OK;
break;
default:
break;
}
return ret;
}
static void
parsebin_pad_added_cb (GstElement * demux, GstPad * pad, DecodebinInput * input)
{
GstDecodebin3 *dbin = input->dbin;
PendingPad *ppad;
GList *tmp;
GST_DEBUG_OBJECT (dbin, "New pad %s:%s (input:%p)", GST_DEBUG_PAD_NAME (pad),
input);
ppad = g_new0 (PendingPad, 1);
ppad->dbin = dbin;
ppad->input = input;
ppad->pad = pad;
ppad->event_probe =
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
(GstPadProbeCallback) parsebin_pending_event_probe, ppad, NULL);
ppad->buffer_probe =
gst_pad_add_probe (pad,
GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) parsebin_buffer_probe, input, NULL);
input->pending_pads = g_list_append (input->pending_pads, ppad);
/* Check if all existing input streams have a buffer probe set */
for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
if (input_stream->input == input &&
input_stream->input_buffer_probe_id == 0) {
GST_DEBUG_OBJECT (input_stream->srcpad, "Adding blocking buffer probe");
input_stream->input_buffer_probe_id =
gst_pad_add_probe (input_stream->srcpad,
GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) parsebin_buffer_probe, input_stream->input,
NULL);
}
}
create_input_stream (dbin, pad, input);
}
static void
@ -624,33 +543,35 @@ parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, DecodebinInput * inp)
{
GstDecodebin3 *dbin = inp->dbin;
DecodebinInputStream *input = NULL;
MultiQueueSlot *slot;
GList *tmp;
GST_DEBUG_OBJECT (pad, "removed");
for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
DecodebinInputStream *cand = (DecodebinInputStream *) tmp->data;
if (cand->srcpad == pad)
if (cand->srcpad == pad) {
input = cand;
break;
}
}
g_assert (input);
/* If there are no pending pads, this means we will definitely not need this
* stream anymore */
if (input) {
GST_DEBUG_OBJECT (pad, "stream %p", input);
if (inp->pending_pads == NULL) {
MultiQueueSlot *slot;
GST_DEBUG_OBJECT (pad, "Remove input stream %p", input);
SELECTION_LOCK (dbin);
slot = get_slot_for_input (dbin, input);
input->srcpad = NULL;
remove_input_stream (dbin, input);
if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) {
/* if slot is still there and already drained, remove it in here */
if (slot->output) {
DecodebinOutputStream *output = slot->output;
GST_DEBUG_OBJECT (pad,
"Multiqueue was drained, Remove output stream");
GST_DEBUG_OBJECT (pad, "Multiqueue was drained, Remove output stream");
dbin->output_streams = g_list_remove (dbin->output_streams, output);
free_output_stream (dbin, output);
@ -663,11 +584,4 @@ parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, DecodebinInput * inp)
free_multiqueue_slot_async (dbin, slot);
}
SELECTION_UNLOCK (dbin);
} else {
input->srcpad = NULL;
if (input->input_buffer_probe_id)
gst_pad_remove_probe (pad, input->input_buffer_probe_id);
input->input_buffer_probe_id = 0;
}
}
}

View file

@ -303,10 +303,6 @@ struct _DecodebinInput
* FIXME : When do we reset it if re-used ?
*/
gboolean drained;
/* HACK : Remove these fields */
/* List of PendingPad structures */
GList *pending_pads;
};
/* Multiqueue Slots */
@ -362,18 +358,6 @@ struct _DecodebinOutputStream
gulong drop_probe_id;
};
/* Pending pads from parsebin */
typedef struct _PendingPad
{
GstDecodebin3 *dbin;
DecodebinInput *input;
GstPad *pad;
gulong buffer_probe;
gulong event_probe;
gboolean saw_eos;
} PendingPad;
/* properties */
enum
{
@ -1935,21 +1919,9 @@ check_all_slot_for_eos (GstDecodebin3 * dbin, GstEvent * ev)
break;
}
if (all_drained) {
INPUT_LOCK (dbin);
if (!pending_pads_are_eos (dbin->main_input))
all_drained = FALSE;
if (all_drained) {
for (iter = dbin->other_inputs; iter; iter = iter->next) {
if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
all_drained = FALSE;
break;
}
}
}
INPUT_UNLOCK (dbin);
}
/* Also check with the inputs, data might be pending */
if (all_drained)
all_drained = all_inputs_are_eos (dbin);
if (all_drained) {
GST_DEBUG_OBJECT (dbin,