decodebin3: Remove custom EOS handling

Initially introduced in 2017 by 4fcbcf4e48 (and
follow up commits) and ec7d81f67c

See https://bugzilla.gnome.org/show_bug.cgi?id=773341 and
https://bugzilla.gnome.org/show_bug.cgi?id=792693 for more details.

This was made to support the legacy behaviour of adaptive demuxers (regarding
streams added/removed dynamically).

Since then, a lot of things have changed in decodebin3 and related elements
regarding how dynamic streams are handled and this custom behaviour is no longer
needed.

This also removes weird behaviours like EOS being delayed until *all* streams
were EOS, which could cause deadlocks downstream.

Fixes #3666

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7194>
This commit is contained in:
Edward Hervey 2024-07-18 09:26:36 +02:00 committed by GStreamer Marge Bot
parent ddd00a9e1d
commit 701165227f

View file

@ -201,35 +201,6 @@ GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
#define EXTRA_DEBUG 1 #define EXTRA_DEBUG 1
#define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
#define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
static GQuark
_custom_final_eos_quark_get (void)
{
static gsize g_quark;
if (g_once_init_enter (&g_quark)) {
gsize quark =
(gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
g_once_init_leave (&g_quark, quark);
}
return g_quark;
}
#define CUSTOM_EOS_QUARK _custom_eos_quark_get ()
#define CUSTOM_EOS_QUARK_DATA "custom-eos"
static GQuark
_custom_eos_quark_get (void)
{
static gsize g_quark;
if (g_once_init_enter (&g_quark)) {
gsize quark = (gsize) g_quark_from_static_string ("decodebin3-custom-eos");
g_once_init_leave (&g_quark, quark);
}
return g_quark;
}
typedef struct _GstDecodebin3 GstDecodebin3; typedef struct _GstDecodebin3 GstDecodebin3;
typedef struct _GstDecodebin3Class GstDecodebin3Class; typedef struct _GstDecodebin3Class GstDecodebin3Class;
@ -873,74 +844,6 @@ gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
} }
} }
/* WITH SELECTION_LOCK TAKEN! */
static gboolean
all_input_streams_are_eos (GstDecodebin3 * dbin)
{
GList *tmp;
/* First check input streams */
for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
DecodebinInputStream *input = (DecodebinInputStream *) tmp->data;
if (input->saw_eos == FALSE)
return FALSE;
}
GST_DEBUG_OBJECT (dbin, "All input streams are EOS");
return TRUE;
}
/** check_all_input_streams_for_eos:
* @dbin: A #GstDecodebin3
* @eos_event: (transfer none): The GST_EVENT_EOS that triggered this check
*
* Check if all inputs streams are EOS. If they are propagates the @eos_event to all
* #DecodebinInputstream pads.
*
* Returns: #TRUE if all pads are EOS and the event was propagated, else #FALSE.
*/
static gboolean
check_all_input_streams_for_eos (GstDecodebin3 * dbin, GstEvent * eos_event)
{
GList *tmp;
GList *outputpads = NULL;
SELECTION_LOCK (dbin);
if (!all_input_streams_are_eos (dbin)) {
SELECTION_UNLOCK (dbin);
return FALSE;
}
/* We know all streams are EOS, properly clean up everything */
/* We grab all peer pads *while* the selection lock is taken and then we will
push EOS downstream with the selection lock released */
for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
DecodebinInputStream *input = (DecodebinInputStream *) tmp->data;
GstPad *peer = gst_pad_get_peer (input->srcpad);
/* Keep a reference to the peer pad */
if (peer)
outputpads = g_list_append (outputpads, peer);
}
SELECTION_UNLOCK (dbin);
for (tmp = outputpads; tmp; tmp = tmp->next) {
GstPad *peer = (GstPad *) tmp->data;
/* Send EOS and then remove elements */
gst_pad_send_event (peer, gst_event_ref (eos_event));
GST_FIXME_OBJECT (peer, "Remove input stream");
gst_object_unref (peer);
}
g_list_free (outputpads);
return TRUE;
}
/* Get the intersection of parser caps and available (sorted) decoders */ /* Get the intersection of parser caps and available (sorted) decoders */
static GstCaps * static GstCaps *
get_parser_caps_filter (GstDecodebin3 * dbin, GstCaps * caps) get_parser_caps_filter (GstDecodebin3 * dbin, GstCaps * caps)
@ -1095,28 +998,6 @@ gst_decodebin_input_stream_src_probe (GstPad * pad, GstPadProbeInfo * info,
{ {
GST_DEBUG_OBJECT (pad, "Marking input as EOS"); GST_DEBUG_OBJECT (pad, "Marking input as EOS");
input->saw_eos = TRUE; input->saw_eos = TRUE;
/* If not all pads are EOS yet, we send our custom EOS (which will be
* handled/dropped downstream of multiqueue) */
if (!check_all_input_streams_for_eos (input->dbin, ev)) {
GstPad *peer = gst_pad_get_peer (input->srcpad);
if (peer) {
/* Send custom-eos event to multiqueue slot */
GstEvent *event;
GST_DEBUG_OBJECT (pad,
"Got EOS end of input stream, post custom-eos");
event = gst_event_new_eos ();
gst_event_set_seqnum (event, gst_event_get_seqnum (ev));
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (event),
CUSTOM_EOS_QUARK, (gchar *) CUSTOM_EOS_QUARK_DATA, NULL);
gst_pad_send_event (peer, event);
gst_object_unref (peer);
} else {
GST_FIXME_OBJECT (pad, "No peer, what should we do ?");
}
}
ret = GST_PAD_PROBE_DROP;
} }
break; break;
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
@ -3195,70 +3076,6 @@ is_selection_done (GstDecodebin3 * dbin)
return msg; return msg;
} }
/** check_and_drain_multiqueue_locked:
* @dbin: A #GstDecodebin3
* @eos_event: The GST_EVENT_EOS that triggered this check.
*
* Check if all #DecodebinInputstream and #MultiqueueSlot are
* emptied/drained. If that is the case, send the final sequence of final EOS
* events based on the provided @eos_event.
*/
static void
check_and_drain_multiqueue_locked (GstDecodebin3 * dbin, GstEvent * eos_event)
{
GList *iter;
GST_DEBUG_OBJECT (dbin, "checking slots for eos");
for (iter = dbin->slots; iter; iter = iter->next) {
MultiQueueSlot *slot = iter->data;
if (slot->output && !slot->is_drained) {
GST_LOG_OBJECT (slot->sink_pad, "Not drained, not all slots are done");
return;
}
}
/* Also check with the inputs, data might be pending */
if (!all_input_streams_are_eos (dbin))
return;
GST_DEBUG_OBJECT (dbin,
"All active slots are drained, and no pending input, push EOS");
for (iter = dbin->input_streams; iter; iter = iter->next) {
GstEvent *stream_start, *eos;
DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
GstPad *peer = gst_pad_get_peer (input->srcpad);
if (!peer) {
GST_DEBUG_OBJECT (input->srcpad, "Not linked to multiqueue");
continue;
}
/* First forward a custom STREAM_START event to reset the EOS status (if
* any) */
stream_start =
gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
if (stream_start) {
GstStructure *s;
GstEvent *custom_stream_start = gst_event_copy (stream_start);
gst_event_unref (stream_start);
s = (GstStructure *) gst_event_get_structure (custom_stream_start);
gst_structure_set (s, "decodebin3-flushing-stream-start",
G_TYPE_BOOLEAN, TRUE, NULL);
gst_pad_send_event (peer, custom_stream_start);
}
/* Send EOS to all slots */
eos = gst_event_new_eos ();
gst_event_set_seqnum (eos, gst_event_get_seqnum (eos_event));
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA, NULL);
gst_pad_send_event (peer, eos);
gst_object_unref (peer);
}
}
/* /*
* Returns TRUE if there are no more streams to output and an ERROR message * Returns TRUE if there are no more streams to output and an ERROR message
* should be posted * should be posted
@ -3542,35 +3359,8 @@ multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
break; break;
case GST_EVENT_EOS: case GST_EVENT_EOS:
{ {
gboolean was_drained = slot->is_drained;
slot->is_drained = TRUE; slot->is_drained = TRUE;
/* Custom EOS handling first */
if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
CUSTOM_EOS_QUARK)) {
/* remove custom-eos */
ev = gst_event_make_writable (ev);
GST_PAD_PROBE_INFO_DATA (info) = ev;
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
CUSTOM_EOS_QUARK, NULL, NULL);
GST_LOG_OBJECT (pad, "Received custom EOS");
ret = GST_PAD_PROBE_HANDLED;
SELECTION_LOCK (dbin);
if (slot->input == NULL) {
GST_DEBUG_OBJECT (pad,
"Got custom-eos from null input stream, removing slot");
remove_slot_from_streaming_thread (dbin, slot);
ret = GST_PAD_PROBE_REMOVE;
} else if (!was_drained) {
check_and_drain_multiqueue_locked (dbin, ev);
}
if (ret == GST_PAD_PROBE_HANDLED)
gst_event_unref (ev);
SELECTION_UNLOCK (dbin);
break;
}
GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p", GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
slot->input); slot->input);
if (slot->input == NULL) { if (slot->input == NULL) {
@ -3591,17 +3381,8 @@ multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
remove_slot_from_streaming_thread (dbin, slot); remove_slot_from_streaming_thread (dbin, slot);
SELECTION_UNLOCK (dbin); SELECTION_UNLOCK (dbin);
ret = GST_PAD_PROBE_REMOVE; ret = GST_PAD_PROBE_REMOVE;
} else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
CUSTOM_FINAL_EOS_QUARK)) {
GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
} else { } else {
GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)"); GST_DEBUG_OBJECT (pad, "Got regular eos, propagating downstream");
/* drop current event as eos will be sent in check_all_slot_for_eos
* when all output streams are also eos */
ret = GST_PAD_PROBE_DROP;
SELECTION_LOCK (dbin);
check_and_drain_multiqueue_locked (dbin, ev);
SELECTION_UNLOCK (dbin);
} }
} }
break; break;