diff --git a/subprojects/gst-plugins-base/gst/playback/gstdecodebin3.c b/subprojects/gst-plugins-base/gst/playback/gstdecodebin3.c index 2acc70d070..d8e0fa3ec4 100644 --- a/subprojects/gst-plugins-base/gst/playback/gstdecodebin3.c +++ b/subprojects/gst-plugins-base/gst/playback/gstdecodebin3.c @@ -145,7 +145,7 @@ * | | | * Probes --/--------/-------------/ * - * ATOMIC SWITCHING + * 4) ATOMIC SWITCHING * * We want to ensure we re-use decoders when switching streams. This takes place * at the multiqueue output level. @@ -163,8 +163,34 @@ * All streams within that list should be activated * 2) to_activate * List of streams that will be moved to requested_selection in the - * mq_slot_reassign() method (i.e. once a stream was deactivated, and the output - * was retargetted) + * mq_slot_reassign() method (i.e. once a stream was deactivated, and the + * output was retargetted) + * + * 5) Change on collections/selection + * + * The collection of streams available can change through time. Either because + * there is an update in the stream itself (like mpeg-ts/mpeg-ts), or from + * gapless playback changes, etc ... + * + * This therefore means that there are potentially more than one "collection" of + * streams in decodebin3 (i.e. input streams might be different from what is + * being outputted). And the incoming stream selection might be happening on any + * of those. + * + * In order to handle that, decodebin3 has a list of `DecodebinCollection` which + * groups together the GstStreamCollection, the requested streams to activate + * and their status. + * + * * The input and output collection can be different + * * The output_collection is the one *currently* present on the output of + * multiqueue + * * For each incoming GST_EVENT_SELECT_STREAMS, we figure out the oldest + * GstStreamCollection to which this applies and store the list of requested + * streams. If it is the current output_collection we can handle the switch + * immediately, else it will be handled later. + * * By detecting the new GST_EVENT_STREAM_START on the output of multiqueue, we + * can identify when we are switching to a new DecodebinCollection. If that is + * the case we progressively switch over to the new requested streams. */ @@ -211,6 +237,37 @@ typedef struct _DecodebinInputStream DecodebinInputStream; typedef struct _DecodebinInput DecodebinInput; typedef struct _DecodebinOutputStream DecodebinOutputStream; +/* Store information regarding collections */ +typedef struct +{ + GstStreamCollection *collection; + /* The list of stream-ids requested for this collection. + * + * Can be NULL (we need to make a selection ourselves when this collection + * starts to appear on the output of multiqueue) + */ + GList *requested_selection; + + /* TEMPORARY : List of streams to activate, LEGACY usage */ + GList *to_activate; + + /* The seqnum of the event that created the list of requested streams + * (GST_SEQNUM_INVALID if not requested from outside) */ + guint32 seqnum; + + /* TRUE if GST_MESSAGE_STREAMS_SELECTED was posted for the stream_ids. Must be + * resetted whenever the stream_ids changes */ + gboolean posted_streams_selected_msg; + + /* TRUE if all stream_ids have an associated MultiqueueSlot. i.e. the + * collection is active */ + gboolean all_streams_present; + + /* TRUE if this collection is an update of the previous one. i.e. it only + * *adds* new streams. */ + gboolean is_update; +} DecodebinCollection; + typedef struct { GstElement *element; @@ -245,17 +302,15 @@ struct _GstDecodebin3 GList *slots; /* List of MultiQueueSlot */ guint slot_id; - /* Active collection */ - GstStreamCollection *collection; - /* requested selection of stream-id to activate post-multiqueue */ - GList *requested_selection; - /* List of stream-id that need to be activated (after a stream switch for ex) */ - GList *to_activate; - /* Pending select streams event */ - guint32 select_streams_seqnum; - /* TRUE if requested_selection was updated, will become FALSE once - * it has fully transitioned to active */ - gboolean selection_updated; + /* List of DecodebinCollection in existence. ordered by oldest (i.e. first is + * currently outputted, last is most recent incoming */ + GList *collections; + + /* Current input collection. */ + DecodebinCollection *input_collection; + + /* Current output collection */ + DecodebinCollection *output_collection; /* End of variables protected by selection_lock */ /* Upstream handles stream selection */ @@ -527,6 +582,10 @@ static gboolean gst_decodebin3_send_event (GstElement * element, static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin); +static DecodebinCollection *db_collection_new (GstStreamCollection * + collection); +static void db_collection_free (DecodebinCollection * collection); + static void gst_decodebin_input_reset (DecodebinInput * input); static void gst_decodebin_input_free (DecodebinInput * input); static DecodebinInput *gst_decodebin_input_new (GstDecodebin3 * dbin, @@ -552,8 +611,11 @@ static MultiQueueSlot DecodebinInputStream * input); static void mq_slot_free (GstDecodebin3 * dbin, MultiQueueSlot * slot); +static void handle_stream_switch (GstDecodebin3 * dbin); + static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin); -static void update_requested_selection (GstDecodebin3 * dbin); +static void update_requested_selection (GstDecodebin3 * dbin, + DecodebinCollection * new_collection); static void parsebin_pad_added_cb (GstElement * demux, GstPad * pad, DecodebinInput * input); @@ -711,19 +773,9 @@ gst_decodebin3_reset (GstDecodebin3 * dbin) dbin->current_mq_min_interleave = dbin->default_mq_min_interleave; dbin->upstream_handles_selection = FALSE; - if (dbin->collection) { - gst_clear_object (&dbin->collection); - } - - g_list_free_full (dbin->requested_selection, g_free); - dbin->requested_selection = NULL; - - g_list_free (dbin->to_activate); - dbin->to_activate = NULL; - - dbin->select_streams_seqnum = GST_SEQNUM_INVALID; - - dbin->selection_updated = FALSE; + g_list_free_full (dbin->collections, (GDestroyNotify) db_collection_free); + dbin->collections = NULL; + dbin->input_collection = dbin->output_collection = NULL; } static void @@ -748,12 +800,6 @@ gst_decodebin3_dispose (GObject * object) } g_mutex_unlock (&dbin->factories_lock); - SELECTION_LOCK (dbin); - if (dbin->collection) { - gst_clear_object (&dbin->collection); - } - SELECTION_UNLOCK (dbin); - INPUT_LOCK (dbin); if (dbin->main_input) { gst_decodebin_input_free (dbin->main_input); @@ -1786,7 +1832,6 @@ gst_decodebin3_release_pad (GstElement * element, GstPad * pad) (GstPadProbeCallback) query_duration_drop_probe, input, NULL); gst_element_post_message (GST_ELEMENT_CAST (dbin), msg); - update_requested_selection (dbin); if (input->parsebin) { gst_pad_remove_probe (input->parsebin_sink, probe_id); @@ -2046,9 +2091,6 @@ sink_event_function (GstPad * sinkpad, GstDecodebin3 * dbin, GstEvent * event) INPUT_UNLOCK (dbin); if (collection_msg) gst_element_post_message (GST_ELEMENT_CAST (dbin), collection_msg); - - /* In all cases we want to make sure the selection is valid */ - update_requested_selection (dbin); } /* If we are waiting to create an identity passthrough, do it now */ @@ -2261,23 +2303,6 @@ remove_from_list (GList * list, const gchar * sid) return list; } -static gboolean -stream_list_equal (GList * lista, GList * listb) -{ - GList *tmp; - - if (g_list_length (lista) != g_list_length (listb)) - return FALSE; - - for (tmp = lista; tmp; tmp = tmp->next) { - gchar *osid = tmp->data; - if (!stream_in_list (listb, osid)) - return FALSE; - } - - return TRUE; -} - /* Called with SELECTION_LOCK */ static gboolean stream_is_active (GstDecodebin3 * dbin, const gchar * stream_id) @@ -2297,41 +2322,52 @@ stream_is_active (GstDecodebin3 * dbin, const gchar * stream_id) static gboolean stream_is_requested (GstDecodebin3 * dbin, const gchar * stream_id) { - return stream_in_list (dbin->requested_selection, stream_id) != NULL; + if (dbin->output_collection == NULL) + return FALSE; + return stream_in_list (dbin->output_collection->requested_selection, + stream_id) != NULL; } + +/** update_requested_selection: + * @dbin: A #GstDecodebin3 + * @new_collection: The #DecodebinCollection to update + * + * Figures out the selection to use for @new_collection. Will figure this out + * based on signals and current output collection. + * + * This function should be called once we start seeing a #DecodebinCollection on + * the output of multiqueue. + * + * Must be called with the SELECTION_LOCK taken + */ static void -update_requested_selection (GstDecodebin3 * dbin) +update_requested_selection (GstDecodebin3 * dbin, + DecodebinCollection * new_collection) { guint i, nb; GList *tmp = NULL; gboolean all_user_selected = TRUE; GstStreamType used_types = 0; - GstStreamCollection *collection; - /* 1. Is there a pending SELECT_STREAMS we can return straight away since - * the switch handler will take care of the pending selection */ - SELECTION_LOCK (dbin); - collection = dbin->collection; - if (G_UNLIKELY (collection == NULL)) { - GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection"); - goto beach; + if (new_collection->requested_selection) { + GST_DEBUG_OBJECT (dbin, "Collection already has a selection"); + return; } - nb = gst_stream_collection_get_size (collection); - /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */ - GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE"); + nb = gst_stream_collection_get_size (new_collection->collection); /* 3. If not, check if we already have some of the streams in the * existing active/requested selection */ for (i = 0; i < nb; i++) { - GstStream *stream = gst_stream_collection_get_stream (collection, i); + GstStream *stream = + gst_stream_collection_get_stream (new_collection->collection, i); const gchar *sid = gst_stream_get_stream_id (stream); gint request = -1; /* Fire select-stream signal to see if outside components want to * hint at which streams should be selected */ g_signal_emit (G_OBJECT (dbin), - gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream, - &request); + gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, + new_collection->collection, stream, &request); GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request); if (request == -1) @@ -2352,9 +2388,10 @@ update_requested_selection (GstDecodebin3 * dbin) } /* 4. If the user didn't explicitly selected all streams, match one stream of each type */ - if (!all_user_selected && dbin->select_streams_seqnum == GST_SEQNUM_INVALID) { + if (!all_user_selected && new_collection->seqnum == GST_SEQNUM_INVALID) { for (i = 0; i < nb; i++) { - GstStream *stream = gst_stream_collection_get_stream (collection, i); + GstStream *stream = + gst_stream_collection_get_stream (new_collection->collection, i); GstStreamType curtype = gst_stream_get_stream_type (stream); if (curtype != GST_STREAM_TYPE_UNKNOWN && !(used_types & curtype)) { const gchar *sid = gst_stream_get_stream_id (stream); @@ -2367,27 +2404,13 @@ update_requested_selection (GstDecodebin3 * dbin) } } -beach: - if (stream_list_equal (tmp, dbin->requested_selection)) { - /* If the selection is equal, there is nothign to do */ - GST_DEBUG_OBJECT (dbin, "Dropping duplicate selection"); - g_list_free (tmp); - tmp = NULL; - } - if (tmp) { /* Finally set the requested selection */ - if (dbin->requested_selection) { - GST_FIXME_OBJECT (dbin, - "Replacing non-NULL requested_selection, what should we do ??"); - g_list_free_full (dbin->requested_selection, g_free); - } - dbin->requested_selection = + new_collection->requested_selection = g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL); - dbin->selection_updated = TRUE; + new_collection->posted_streams_selected_msg = FALSE; g_list_free (tmp); } - SELECTION_UNLOCK (dbin); } /* sort_streams: @@ -2569,15 +2592,15 @@ find_message_parsebin (GstDecodebin3 * dbin, GstElement * child) } static const gchar * -stream_in_collection (GstDecodebin3 * dbin, gchar * sid) +stream_in_collection (GstStreamCollection * collection, gchar * sid) { guint i, len; - if (dbin->collection == NULL) + if (collection == NULL) return NULL; - len = gst_stream_collection_get_size (dbin->collection); + len = gst_stream_collection_get_size (collection); for (i = 0; i < len; i++) { - GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i); + GstStream *stream = gst_stream_collection_get_stream (collection, i); const gchar *osid = gst_stream_get_stream_id (stream); if (!g_strcmp0 (sid, osid)) return osid; @@ -2586,6 +2609,63 @@ stream_in_collection (GstDecodebin3 * dbin, gchar * sid) return NULL; } +static DecodebinCollection * +find_collection_for_stream (GstDecodebin3 * dbin, gchar * stream_id) +{ + GList *tmp; + + GST_DEBUG_OBJECT (dbin, "stream_id `%s`", stream_id); + + /* Recursively find the collection to which this stream belongs */ + for (tmp = dbin->collections; tmp; tmp = tmp->next) { + DecodebinCollection *collection = tmp->data; + GST_DEBUG_OBJECT (dbin, "Trying on DBCollection %p", collection); + if (stream_in_collection (collection->collection, stream_id)) + return collection; + } + + return NULL; +} + +static gboolean +are_all_streams_in_collection (GstStreamCollection * collection, + GList * streams) +{ + GList *tmp; + + for (tmp = streams; tmp; tmp = tmp->next) { + if (!stream_in_collection (collection, tmp->data)) + return FALSE; + } + return TRUE; +} + +static void +db_collection_free (DecodebinCollection * collection) +{ + GST_DEBUG ("Freeing collection %p for %" GST_PTR_FORMAT, collection, + collection->collection); + gst_object_unref (collection->collection); + g_list_free_full (collection->requested_selection, g_free); + g_list_free (collection->to_activate); + + g_free (collection); +} + +static DecodebinCollection * +db_collection_new (GstStreamCollection * collection) +{ + DecodebinCollection *db_collection = g_new0 (DecodebinCollection, 1); + + db_collection->collection = collection; + db_collection->seqnum = GST_SEQNUM_INVALID; + + GST_DEBUG ("Created new collection %p for %" GST_PTR_FORMAT, db_collection, + collection); + + return db_collection; +} + /** handle_stream_collection_locked: * @dbin: * @collection: (transfer none): The new collection for @input. Can be %NULL. @@ -2605,6 +2685,7 @@ handle_stream_collection_locked (GstDecodebin3 * dbin, GstStreamCollection * collection, DecodebinInput * input) { GstMessage *message = NULL; + gboolean is_update = FALSE; #ifndef GST_DISABLE_GST_DEBUG const gchar *upstream_id; guint i; @@ -2654,31 +2735,41 @@ handle_stream_collection_locked (GstDecodebin3 * dbin, } #endif - /* Store collection for later usage */ SELECTION_LOCK (dbin); - if (dbin->collection == NULL) { - GST_DEBUG_OBJECT (dbin, "Storing updated global collection"); - dbin->collection = collection; - message = - gst_message_new_stream_collection ((GstObject *) dbin, collection); - } else if (dbin->collection == collection) { - GST_DEBUG_OBJECT (dbin, "Collection didn't change"); - gst_object_unref (collection); - } else { - /* We need to check who emitted this collection (the owner). - * If we already had a collection from that user, this one is an update, - * that is to say that we need to figure out how we are going to re-use - * the streams/slot */ - GST_FIXME_OBJECT (dbin, "New collection but already had one ..."); - /* FIXME : When do we switch from pending collection to active collection ? - * When all streams from active collection are drained in multiqueue output ? */ - gst_object_unref (dbin->collection); - dbin->collection = collection; - message = - gst_message_new_stream_collection ((GstObject *) dbin, collection); + /* If collection is same as current input collection, leave */ + if (dbin->input_collection) { + GstStreamCollection *previous = dbin->input_collection->collection; + + if (collection == previous) { + GST_DEBUG_OBJECT (dbin, "Collection didn't change"); + gst_object_unref (collection); + SELECTION_UNLOCK (dbin); + return NULL; + } + /* Check if this collection is an update of the previous one */ + if (gst_stream_collection_get_size (collection) > + gst_stream_collection_get_size (previous)) { + guint i; + is_update = TRUE; + for (i = 0; i < gst_stream_collection_get_size (previous); i++) { + GstStream *stream = gst_stream_collection_get_stream (previous, i); + const gchar *sid = gst_stream_get_stream_id (stream); + if (!stream_in_collection (collection, (gchar *) sid)) { + is_update = FALSE; + break; + } + } + } } - if (message) - dbin->select_streams_seqnum = GST_SEQNUM_INVALID; + + /* We have a new collection, store it */ + GST_DEBUG_OBJECT (dbin, "Switching to new input collection (is_update:%d)", + is_update); + dbin->input_collection = db_collection_new (collection); + dbin->input_collection->is_update = is_update; + dbin->collections = g_list_append (dbin->collections, dbin->input_collection); + message = gst_message_new_stream_collection ((GstObject *) dbin, collection); + SELECTION_UNLOCK (dbin); return message; @@ -2688,7 +2779,6 @@ static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message) { GstDecodebin3 *dbin = (GstDecodebin3 *) bin; - gboolean posting_collection = FALSE; GList *l; GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message)); @@ -2751,7 +2841,6 @@ gst_decodebin3_handle_message (GstBin * bin, GstMessage * message) INPUT_UNLOCK (dbin); if (collection_message) { - posting_collection = TRUE; gst_message_unref (message); message = collection_message; } @@ -2789,11 +2878,6 @@ gst_decodebin3_handle_message (GstBin * bin, GstMessage * message) GST_BIN_CLASS (parent_class)->handle_message (bin, message); - if (posting_collection) { - /* Figure out a selection for that collection */ - update_requested_selection (dbin); - } - return; drop_message: @@ -2914,7 +2998,6 @@ mq_slot_get_or_create_output (MultiQueueSlot * slot) GstDecodebin3 *dbin = slot->dbin; DecodebinOutputStream *output = NULL; const gchar *stream_id; - gchar *id_in_list = NULL; /* If we already have a configured output, just use it */ if (slot->output != NULL) { @@ -2923,33 +3006,10 @@ mq_slot_get_or_create_output (MultiQueueSlot * slot) return slot->output; } - /* - * FIXME - * - * This method needs to be split into multiple parts - * - * 1) Figure out whether stream should be exposed or not - * This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence - * in the default stream attribution - * - * 2) Figure out whether an output stream should be created, whether - * we can re-use the output stream already linked to the slot, or - * whether we need to get re-assigned another (currently used) output - * stream. - */ - stream_id = slot->active_stream_id; GST_DEBUG_OBJECT (slot->src_pad, "active stream %" GST_PTR_FORMAT, slot->active_stream); - /* 0. Emit autoplug-continue signal for pending caps ? */ - GST_FIXME_OBJECT (dbin, "emit autoplug-continue"); - - /* 1. if in EXPOSE_ALL_MODE, just accept */ - GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE"); - - /* 3. In default mode check if we should expose */ - id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id); /* If the stream is not requested, bail out */ if (!stream_is_requested (dbin, stream_id) && !dbin->upstream_handles_selection) { @@ -2967,10 +3027,6 @@ mq_slot_get_or_create_output (MultiQueueSlot * slot) GST_DEBUG_OBJECT (slot->src_pad, "Reassigning to output %s:%s", GST_DEBUG_PAD_NAME (output->src_pad)); /* Move this output from its current slot to this slot */ - dbin->to_activate = g_list_append (dbin->to_activate, (gchar *) stream_id); - dbin->requested_selection = - g_list_remove (dbin->requested_selection, id_in_list); - g_free (id_in_list); SELECTION_UNLOCK (dbin); gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE, (GstPadProbeCallback) mq_slot_unassign_probe, output->slot, NULL); @@ -2995,9 +3051,7 @@ is_selection_done (GstDecodebin3 * dbin) { GList *tmp; GstMessage *msg; - - if (!dbin->selection_updated) - return NULL; + DecodebinCollection *collection = dbin->output_collection; GST_LOG_OBJECT (dbin, "Checking"); @@ -3006,11 +3060,21 @@ is_selection_done (GstDecodebin3 * dbin) return NULL; } - if (dbin->to_activate != NULL) { + if (!collection) { + GST_DEBUG ("No collection"); + return NULL; + } + + if (collection->posted_streams_selected_msg) { + GST_DEBUG ("Already posted message for this selection"); + return NULL; + } + + if (collection->to_activate != NULL) { GST_DEBUG ("Still have streams to activate"); return NULL; } - for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) { + for (tmp = collection->requested_selection; tmp; tmp = tmp->next) { GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data); if (!stream_is_active (dbin, (gchar *) tmp->data)) { GST_DEBUG ("Not in active selection, returning"); @@ -3021,25 +3085,22 @@ is_selection_done (GstDecodebin3 * dbin) GST_DEBUG_OBJECT (dbin, "Selection active, creating message"); /* We are completely active */ - msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection); - if (dbin->select_streams_seqnum != GST_SEQNUM_INVALID) { - gst_message_set_seqnum (msg, dbin->select_streams_seqnum); + msg = + gst_message_new_streams_selected ((GstObject *) dbin, + collection->collection); + if (collection->seqnum != GST_SEQNUM_INVALID) { + gst_message_set_seqnum (msg, collection->seqnum); } - for (tmp = dbin->output_streams; tmp; tmp = tmp->next) { - DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data; - if (output->slot) { - GST_DEBUG_OBJECT (dbin, "Adding stream %s", - output->slot->active_stream_id); - if (stream_in_list (dbin->requested_selection, - output->slot->active_stream_id)) - gst_message_streams_selected_add (msg, output->slot->active_stream); - else - GST_WARNING_OBJECT (dbin, - "Output slot still active for old selection ?"); - } else - GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output); + for (tmp = dbin->slots; tmp; tmp = tmp->next) { + MultiQueueSlot *slot = tmp->data; + if (slot->output) { + GST_DEBUG_OBJECT (dbin, "Adding stream %s", slot->active_stream_id); + g_assert (stream_is_requested (dbin, slot->active_stream_id)); + gst_message_streams_selected_add (msg, slot->active_stream); + } } - dbin->selection_updated = FALSE; + collection->posted_streams_selected_msg = TRUE; + return msg; } @@ -3116,7 +3177,10 @@ no_more_streams_locked (GstDecodebin3 * dbin) { GList *tmp; - if (dbin->requested_selection || dbin->to_activate) + if (!dbin->output_collection) + return FALSE; + + if (dbin->output_collection->requested_selection) return FALSE; for (tmp = dbin->slots; tmp; tmp = tmp->next) { @@ -3144,6 +3208,7 @@ mq_slot_check_reconfiguration (MultiQueueSlot * slot) DecodebinOutputStream *output; GstMessage *msg = NULL; gboolean no_more_streams; + DecodebinCollection *collection = dbin->output_collection; SELECTION_LOCK (dbin); output = mq_slot_get_or_create_output (slot); @@ -3161,9 +3226,10 @@ mq_slot_check_reconfiguration (MultiQueueSlot * slot) GST_DEBUG_OBJECT (dbin, "Removing failing stream from selection: %" GST_PTR_FORMAT, slot->active_stream); - slot->dbin->requested_selection = - remove_from_list (slot->dbin->requested_selection, + collection->requested_selection = + remove_from_list (collection->requested_selection, slot->active_stream_id); + collection->posted_streams_selected_msg = FALSE; /* Remove output */ mq_slot_set_output (slot, NULL); @@ -3171,7 +3237,6 @@ mq_slot_check_reconfiguration (MultiQueueSlot * slot) db_output_stream_free (output); no_more_streams = no_more_streams_locked (dbin); - dbin->selection_updated = TRUE; SELECTION_UNLOCK (dbin); if (msg) gst_element_post_message ((GstElement *) slot->dbin, msg); @@ -3191,11 +3256,40 @@ mq_slot_check_reconfiguration (MultiQueueSlot * slot) } } +static void +update_stream_presence (GstDecodebin3 * dbin, DecodebinCollection * collection) +{ + GList *tmp; + + if (dbin->upstream_handles_selection) { + collection->all_streams_present = TRUE; + return; + } + + if (g_list_length (dbin->slots) != + gst_stream_collection_get_size (collection->collection)) { + collection->all_streams_present = FALSE; + return; + } + + for (tmp = dbin->slots; tmp; tmp = tmp->next) { + MultiQueueSlot *slot = tmp->data; + if (!stream_in_collection (collection->collection, + (gchar *) slot->active_stream_id)) { + collection->all_streams_present = FALSE; + return; + } + } + + collection->all_streams_present = TRUE; +} + /** mq_slot_handle_stream_start: * @slot: A #MultiQueueSlot - * @stream_event: (transfer none): A #GST_EVENT_STREAM_START + * @stream_event: A #GST_EVENT_STREAM_START * - * Returns: The #GstPadProbeReturn + * Returns: The #GstPadProbeReturn. If #GST_PAD_PROBE_HANDLED the ownership of + * @stream_event was taken. */ static GstPadProbeReturn mq_slot_handle_stream_start (MultiQueueSlot * slot, GstEvent * stream_event) @@ -3203,6 +3297,8 @@ mq_slot_handle_stream_start (MultiQueueSlot * slot, GstEvent * stream_event) GstDecodebin3 *dbin = slot->dbin; GstStream *stream = NULL; const GstStructure *s = gst_event_get_structure (stream_event); + DecodebinCollection *collection; + GList *tmp, *next; /* Drop STREAM_START events used to cleanup multiqueue */ if (s && gst_structure_has_field (s, "decodebin3-flushing-stream-start")) { @@ -3217,44 +3313,117 @@ mq_slot_handle_stream_start (MultiQueueSlot * slot, GstEvent * stream_event) return GST_PAD_PROBE_OK; } + SELECTION_LOCK (dbin); + slot->is_drained = FALSE; - GST_DEBUG_OBJECT (slot->src_pad, "Stream Start '%s'", - gst_stream_get_stream_id (stream)); + GST_DEBUG_OBJECT (slot->src_pad, "%" GST_PTR_FORMAT, stream); - if (slot->active_stream == NULL) { - slot->active_stream = stream; - slot->active_stream_id = gst_stream_get_stream_id (stream); - } else if (slot->active_stream != stream) { - gboolean stream_type_changed = - gst_stream_get_stream_type (stream) != - gst_stream_get_stream_type (slot->active_stream); + /* 1. Store new stream/stream_id */ + if (slot->active_stream == stream) { + GST_DEBUG_OBJECT (slot->src_pad, "No stream change"); + goto beach; + } - GST_DEBUG_OBJECT (slot->src_pad, "Stream change (%s => %s) !", - gst_stream_get_stream_id (slot->active_stream), - gst_stream_get_stream_id (stream)); - gst_object_unref (slot->active_stream); - slot->active_stream = stream; - slot->active_stream_id = gst_stream_get_stream_id (stream); + gst_object_replace ((GstObject **) & slot->active_stream, + (GstObject *) stream); + slot->active_stream_id = gst_stream_get_stream_id (stream); - if (stream_type_changed) { - DecodebinOutputStream *previous_output; - /* The stream type has changed, we get rid of the current output. A - * new one (targetting the new stream type) will be created once the - * caps are received. */ - previous_output = mq_slot_set_output (slot, NULL); - if (previous_output) { - /* FIXME : Can we transfer this to another slot ? */ - GST_DEBUG_OBJECT (slot->src_pad, - "Stream type change, discarding current output stream"); - SELECTION_LOCK (dbin); - dbin->output_streams = - g_list_remove (dbin->output_streams, previous_output); - db_output_stream_free (previous_output); - SELECTION_UNLOCK (dbin); - } - } - } else - gst_object_unref (stream); + /* If the slot is active and the stream type is different, remove it. + * + * This will only happen in case no slots of the same type was available for + * that input (ex: switching from audio-only to video-only upstream of + * decodebin3). + */ + if (slot->output && slot->output->type != gst_stream_get_stream_type (stream)) { + DecodebinOutputStream *previous_output = slot->output; + GST_DEBUG_OBJECT (slot->src_pad, + "Slot is changing stream type, removing output"); + mq_slot_set_output (slot, NULL); + dbin->output_streams = + g_list_remove (dbin->output_streams, previous_output); + db_output_stream_free (previous_output); + } + + collection = + find_collection_for_stream (dbin, (gchar *) slot->active_stream_id); + g_assert (collection); + + /* check if all streams are present for that collection. We do it now since we + * might just have a single stream in the collection */ + update_stream_presence (dbin, collection); + + if (collection->all_streams_present) + GST_DEBUG_OBJECT (dbin, "All streams are now present for collection"); + + /* If the output collection didn't change, we can go and check if it's time to + * switch */ + if (collection == dbin->output_collection) + goto check_for_switch; + + /* Collection is different */ + GST_DEBUG_OBJECT (slot->src_pad, "Stream belongs to a new collection"); + + /* Make sure the collection has a valid selection at this point. */ + update_requested_selection (dbin, collection); + + /* NOTE: The assumption here would be that the collection for this stream is + * the "next" collection in the list of collections (i.e. the one following + * the current outputted one). + * + * But this is not always true since the collections might be rapidly + * expanding ones (like for example when a mpeg-ps or rtsp source dynamically + * adds the streams to a stream collection as it sees them). + * + * When switching output collection we need to drain out those "intermediary" + * collections and make sure they were valid. + */ + + for (tmp = dbin->collections; tmp; tmp = next) { + DecodebinCollection *candidate = tmp->data; + next = tmp->next; + + /* We have reached our target */ + if (candidate == collection) + break; + /* This is the current output collection */ + if (candidate == dbin->output_collection) + continue; + GST_DEBUG_OBJECT (dbin, + "Dropping intermediary collection %p is_update:%d %" GST_PTR_FORMAT, + candidate, candidate->is_update, candidate->collection); + + /* Dropping an intermediate collection is only possible if there wasn't any + * previous output collection or it was an update of the previous one + */ + g_assert (candidate->is_update || dbin->output_collection == NULL); + dbin->collections = g_list_remove (dbin->collections, candidate); + db_collection_free (candidate); + } + + if (dbin->output_collection == NULL) { + /* We can switch immediately to this collection */ + dbin->output_collection = collection; + goto check_for_switch; + } + + /* If the new collection is fully present, we can switch */ + if (collection->all_streams_present) { + GST_DEBUG_OBJECT (dbin, "Switching to new output collection"); + dbin->collections = + g_list_remove (dbin->collections, dbin->output_collection); + db_collection_free (dbin->output_collection); + dbin->output_collection = collection; + } + +check_for_switch: + if (!dbin->upstream_handles_selection && collection == dbin->output_collection + && collection->all_streams_present) { + handle_stream_switch (dbin); + } + +beach: + gst_object_unref (stream); + SELECTION_UNLOCK (dbin); return GST_PAD_PROBE_OK; } @@ -3916,21 +4085,13 @@ mq_slot_reassign (MultiQueueSlot * slot) DecodebinOutputStream *output; MultiQueueSlot *target_slot = NULL; GList *tmp; - const gchar *sid, *tsid; + DecodebinCollection *collection = dbin->output_collection; SELECTION_LOCK (dbin); output = slot->output; - if (G_UNLIKELY (slot->active_stream == NULL)) { - GST_DEBUG_OBJECT (slot->src_pad, - "Called on inactive slot (active_stream == NULL)"); - SELECTION_UNLOCK (dbin); - return; - } - - if (G_UNLIKELY (output == NULL)) { - GST_DEBUG_OBJECT (slot->src_pad, - "Slot doesn't have any output to be removed"); + if (G_UNLIKELY (slot->active_stream == NULL || output == NULL)) { + GST_DEBUG_OBJECT (slot->src_pad, "Called on slot not active or requested"); SELECTION_UNLOCK (dbin); return; } @@ -3952,7 +4113,7 @@ mq_slot_reassign (MultiQueueSlot * slot) /* Can we re-assign this output to a requested stream ? */ GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream"); - for (tmp = dbin->to_activate; tmp; tmp = tmp->next) { + for (tmp = collection->to_activate; tmp; tmp = tmp->next) { MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data); GST_LOG_OBJECT (slot->src_pad, "Checking slot %s:%s (output:%p , stream:%s)", @@ -3962,11 +4123,8 @@ mq_slot_reassign (MultiQueueSlot * slot) GST_DEBUG_OBJECT (slot->src_pad, "Using %s:%s as reassigned slot", GST_DEBUG_PAD_NAME (tslot->src_pad)); target_slot = tslot; - tsid = tmp->data; - /* Pass target stream id to requested selection */ - dbin->requested_selection = - g_list_append (dbin->requested_selection, g_strdup (tmp->data)); - dbin->to_activate = g_list_delete_link (dbin->to_activate, tmp); + collection->to_activate = + g_list_delete_link (collection->to_activate, tmp); break; } } @@ -4012,17 +4170,15 @@ mq_slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info, /** handle_stream_switch: * @dbin: A #GstDecodebin3 - * @select_streams: The list of stream-id to switch to - * @seqnum: The seqnum of the event that triggered this * - * Figures out which slots to (de)activate for the given @select_streams. + * Figures out which slots to (de)activate for the given output_collection. * * Must be called with SELECTION_LOCK taken. */ static void -handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams, - guint32 seqnum) +handle_stream_switch (GstDecodebin3 * dbin) { + DecodebinCollection *collection = dbin->output_collection; GList *tmp; /* List of slots to (de)activate. */ GList *slots_to_deactivate = NULL; @@ -4033,16 +4189,13 @@ handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams, GList *pending_streams = NULL; GList *slots_to_reassign = NULL; - if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) { - GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime"); - return; - } + g_return_if_fail (collection); /* COMPARE the requested streams to the active and requested streams * on multiqueue. */ /* First check the slots to activate and which ones are unknown */ - for (tmp = select_streams; tmp; tmp = tmp->next) { + for (tmp = collection->requested_selection; tmp; tmp = tmp->next) { const gchar *sid = (const gchar *) tmp->data; MultiQueueSlot *slot; GST_DEBUG_OBJECT (dbin, "Checking for requested stream '%s'", sid); @@ -4052,12 +4205,8 @@ handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams, * id */ if (slot == NULL || slot->active_stream == NULL) { /* There is no slot on which this stream is active */ - if (stream_in_collection (dbin, (gchar *) sid)) { - GST_DEBUG_OBJECT (dbin, "Adding to pending streams '%s'", sid); - pending_streams = g_list_append (pending_streams, (gchar *) sid); - } else { - GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid); - } + GST_DEBUG_OBJECT (dbin, "Adding to pending streams '%s'", sid); + pending_streams = g_list_append (pending_streams, (gchar *) sid); } else if (slot->output == NULL) { /* There is a slot on which this stream is active or pending */ GST_DEBUG_OBJECT (dbin, @@ -4081,12 +4230,13 @@ handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams, gboolean slot_to_deactivate = TRUE; if (slot->active_stream) { - if (stream_in_list (select_streams, slot->active_stream_id)) + if (stream_in_list (collection->requested_selection, + slot->active_stream_id)) slot_to_deactivate = FALSE; } if (slot_to_deactivate && slot->pending_stream && slot->pending_stream != slot->active_stream) { - if (stream_in_list (select_streams, + if (stream_in_list (collection->requested_selection, gst_stream_get_stream_id (slot->pending_stream))) slot_to_deactivate = FALSE; } @@ -4164,29 +4314,19 @@ handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams, } if (slots_to_activate == NULL && pending_streams != NULL) { - GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection"); - if (dbin->requested_selection) - g_list_free_full (dbin->requested_selection, g_free); - dbin->requested_selection = - g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL); + GST_ERROR_OBJECT (dbin, "Stream switch requested for future collection"); g_list_free (slots_to_deactivate); g_list_free (pending_streams); slots_to_deactivate = NULL; pending_streams = NULL; + /* This should never happen, this function is only called for streams present */ + g_assert (FALSE); } else { - if (dbin->requested_selection) - g_list_free_full (dbin->requested_selection, g_free); - dbin->requested_selection = - g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL); - dbin->requested_selection = - g_list_concat (dbin->requested_selection, - g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL)); - if (dbin->to_activate) - g_list_free (dbin->to_activate); - dbin->to_activate = g_list_copy (streams_to_reassign); + if (collection->to_activate) + g_list_free (collection->to_activate); + collection->to_activate = g_list_copy (streams_to_reassign); } - dbin->selection_updated = TRUE; SELECTION_UNLOCK (dbin); if (slots_to_activate && !slots_to_reassign) { @@ -4234,6 +4374,8 @@ handle_select_streams (GstDecodebin3 * dbin, GstEvent * event) { GList *streams = NULL; guint32 seqnum = gst_event_get_seqnum (event); + GList *tmp; + DecodebinCollection *collection = NULL; if (dbin->upstream_handles_selection) { GST_DEBUG_OBJECT (dbin, "Letting select-streams event flow upstream"); @@ -4249,18 +4391,37 @@ handle_select_streams (GstDecodebin3 * dbin, GstEvent * event) SELECTION_LOCK (dbin); /* Find the collection to which these list of streams apply */ + for (tmp = dbin->collections; tmp; tmp = tmp->next) { + DecodebinCollection *cand = tmp->data; + if (are_all_streams_in_collection (cand->collection, streams)) { + collection = cand; + break; + } + } - if (seqnum == dbin->select_streams_seqnum) { + if (collection == NULL) { + GST_WARNING_OBJECT (dbin, "Requested streams from no known collection"); + goto beach; + } + + if (seqnum == collection->seqnum) { GST_DEBUG_OBJECT (dbin, "Already handled/handling that SELECT_STREAMS event"); goto beach; } - dbin->select_streams_seqnum = seqnum; + /* Update the requested list of streams */ + if (collection->requested_selection) { + g_list_free_full (collection->requested_selection, g_free); + } + /* We give ownership of the streams to the DecodebinCollection */ + collection->requested_selection = streams; + collection->seqnum = seqnum; + collection->posted_streams_selected_msg = FALSE; - /* Finally handle the switch */ - handle_stream_switch (dbin, streams, seqnum); - g_list_free_full (streams, g_free); + /* If the collection is the current output one, handle the switch */ + if (collection == dbin->output_collection) + handle_stream_switch (dbin); beach: SELECTION_UNLOCK (dbin);