diff --git a/subprojects/gst-plugins-base/docs/plugins/gst_plugins_cache.json b/subprojects/gst-plugins-base/docs/plugins/gst_plugins_cache.json index 1a17fdae9e..efddbb81f4 100644 --- a/subprojects/gst-plugins-base/docs/plugins/gst_plugins_cache.json +++ b/subprojects/gst-plugins-base/docs/plugins/gst_plugins_cache.json @@ -11616,6 +11616,18 @@ "type": "gdouble", "writable": true }, + "parse-streams": { + "blurb": "Extract the elementary streams of non-raw sources", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "null", + "readable": true, + "type": "gboolean", + "writable": true + }, "ring-buffer-max-size": { "blurb": "Max. amount of data in the ring buffer (bytes, 0 = ring buffer disabled)", "conditionally-available": false, diff --git a/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c b/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c index 46a1753ac2..147e5042c2 100644 --- a/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c +++ b/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c @@ -102,12 +102,30 @@ struct _ChildSrcPadInfo /* An optional typefind */ GstElement *typefind; - /* An optional demuxer */ + /* Pre-parsebin buffering elements. Only present is parse-streams and + * downloading *or* ring-buffer-max-size */ + GstElement *pre_parse_queue; + + /* Post-parsebin multiqueue. Only present if parse-streams and buffering is + * required */ + GstElement *multiqueue; + + /* An optional demuxer or parsebin */ GstElement *demuxer; gboolean demuxer_handles_buffering; /* list of output slots */ GList *outputs; + + /* The following fields specify how this output should be handled */ + + /* use_downloadbuffer : TRUE if the content from the source should be + * downloaded with a downloadbuffer element */ + gboolean use_downloadbuffer; + + /* use_queue2: TRUE if the contents should be buffered through a queue2 + * element */ + gboolean use_queue2; }; /* Output Slot: @@ -157,6 +175,7 @@ struct _GstURISourceBin gboolean use_buffering; gdouble low_watermark; gdouble high_watermark; + gboolean parse_streams; GstElement *source; @@ -217,6 +236,7 @@ enum #define DEFAULT_RING_BUFFER_MAX_SIZE 0 #define DEFAULT_LOW_WATERMARK 0.01 #define DEFAULT_HIGH_WATERMARK 0.60 +#define DEFAULT_PARSE_STREAMS FALSE #define ACTUAL_DEFAULT_BUFFER_SIZE 10 * 1024 * 1024 /* The value used for byte limits when buffer-size == -1 */ #define ACTUAL_DEFAULT_BUFFER_DURATION 5 * GST_SECOND /* The value used for time limits when buffer-duration == -1 */ @@ -239,6 +259,7 @@ enum PROP_LOW_WATERMARK, PROP_HIGH_WATERMARK, PROP_STATISTICS, + PROP_PARSE_STREAMS, }; #define CUSTOM_EOS_QUARK _custom_eos_quark_get () @@ -289,7 +310,6 @@ static void handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, static gboolean setup_typefind (ChildSrcPadInfo * info); static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad); static OutputSlotInfo *new_output_slot (ChildSrcPadInfo * info, - gboolean do_download, gboolean is_adaptive, gboolean no_buffering, GstPad * originating_pad); static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc); static void free_output_slot_async (GstURISourceBin * urisrc, @@ -423,6 +443,20 @@ gst_uri_source_bin_class_init (GstURISourceBinClass * klass) "this element", GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + /** + * GstURISourceBin:parse-streams: + * + * A `parsebin` element will be used on all non-raw streams, and urisourcebin + * will output the elementary streams. Recommended when buffering is used + * since it will provide accurate buffering levels. + * + * Since: 1.22 + */ + g_object_class_install_property (gobject_class, PROP_PARSE_STREAMS, + g_param_spec_boolean ("parse-streams", "Parse Streams", + "Extract the elementary streams of non-raw sources", + DEFAULT_PARSE_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** * GstURISourceBin::drained: * @@ -560,6 +594,9 @@ gst_uri_source_bin_set_property (GObject * object, guint prop_id, urisrc->high_watermark = g_value_get_double (value); update_queue_values (urisrc); break; + case PROP_PARSE_STREAMS: + urisrc->parse_streams = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -616,6 +653,9 @@ gst_uri_source_bin_get_property (GObject * object, guint prop_id, case PROP_STATISTICS: g_value_take_boxed (value, get_queue_statistics (urisrc)); break; + case PROP_PARSE_STREAMS: + g_value_set_boolean (value, urisrc->parse_streams); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -659,6 +699,19 @@ free_child_src_pad_info (ChildSrcPadInfo * info, GstURISourceBin * urisrc) g_list_foreach (info->outputs, (GFunc) free_output_slot, urisrc); g_list_free (info->outputs); + if (info->multiqueue) { + GST_DEBUG_OBJECT (urisrc, "Removing multiqueue"); + gst_element_set_state (info->multiqueue, GST_STATE_NULL); + remove_buffering_msgs (urisrc, GST_OBJECT_CAST (info->multiqueue)); + gst_bin_remove (GST_BIN_CAST (urisrc), info->multiqueue); + } + + if (info->pre_parse_queue) { + gst_element_set_state (info->pre_parse_queue, GST_STATE_NULL); + remove_buffering_msgs (urisrc, GST_OBJECT_CAST (info->pre_parse_queue)); + gst_bin_remove (GST_BIN_CAST (urisrc), info->pre_parse_queue); + } + g_free (info); } @@ -700,17 +753,14 @@ new_demuxer_pad_added_cb (GstElement * element, GstPad * pad, OutputSlotInfo *slot; GstPad *output_pad; + GST_DEBUG_OBJECT (element, "New pad %" GST_PTR_FORMAT, pad); + GST_URI_SOURCE_BIN_LOCK (urisrc); /* If the demuxer handles buffering and is streams-aware, we can expose it as-is directly. We still add an event probe to deal with EOS */ - slot = - new_output_slot (info, FALSE, FALSE, info->demuxer_handles_buffering, - pad); + slot = new_output_slot (info, pad); output_pad = gst_object_ref (slot->output_pad); - GST_DEBUG_OBJECT (element, - "New streams-aware demuxer pad %s:%s , exposing directly", - GST_DEBUG_PAD_NAME (pad)); slot->demuxer_event_probe_id = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH, (GstPadProbeCallback) demux_pad_events, @@ -968,27 +1018,104 @@ on_queue_bitrate_changed (GstElement * queue, GParamSpec * pspec, (GstElementCallAsyncFunc) update_queue_values, NULL, NULL); } +static void +setup_downloadbuffer (GstURISourceBin * urisrc, GstElement * downloadbuffer) +{ + gchar *temp_template, *filename; + const gchar *tmp_dir, *prgname; + + tmp_dir = g_get_user_cache_dir (); + prgname = g_get_prgname (); + if (prgname == NULL) + prgname = "GStreamer"; + + filename = g_strdup_printf ("%s-XXXXXX", prgname); + + /* build our filename */ + temp_template = g_build_filename (tmp_dir, filename, NULL); + + GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)", + temp_template, tmp_dir, prgname, filename); + + /* configure progressive download for selected media types */ + g_object_set (downloadbuffer, "temp-template", temp_template, NULL); + + g_free (filename); + g_free (temp_template); +} + +static void +setup_multiqueue (GstURISourceBin * urisrc, ChildSrcPadInfo * info, + GstElement * multiqueue) +{ + if (info->use_downloadbuffer) { + /* If we have a downloadbuffer we will let that one deal with buffering, + and we only use multiqueue for dealing with interleave */ + g_object_set (info->multiqueue, "use-buffering", FALSE, NULL); + } else { + /* Else we set the minimum interleave time of multiqueue to the required + * buffering duration and ask it to report buffering */ + g_object_set (info->multiqueue, "use-buffering", TRUE, + "min-interleave-time", GET_BUFFER_DURATION (urisrc), NULL); + } + /* Common properties */ + g_object_set (info->multiqueue, + "sync-by-running-time", TRUE, + "use-interleave", TRUE, + "max-size-bytes", 0, + "max-size-buffers", 0, + "low-watermark", urisrc->low_watermark, + "high-watermark", urisrc->high_watermark, NULL); + gst_bin_add (GST_BIN_CAST (urisrc), info->multiqueue); + gst_element_sync_state_with_parent (info->multiqueue); +} + /* Called with lock held */ static OutputSlotInfo * -new_output_slot (ChildSrcPadInfo * info, gboolean do_download, - gboolean is_adaptive, gboolean no_buffering, GstPad * originating_pad) +new_output_slot (ChildSrcPadInfo * info, GstPad * originating_pad) { GstURISourceBin *urisrc = info->urisrc; OutputSlotInfo *slot; GstPad *srcpad; GstElement *queue = NULL; const gchar *elem_name; + gboolean use_downloadbuffer; GST_DEBUG_OBJECT (urisrc, - "do_download:%d is_adaptive:%d, no_buffering:%d, originating_pad:%" - GST_PTR_FORMAT, do_download, is_adaptive, no_buffering, originating_pad); + "use_queue2:%d use_downloadbuffer:%d, demuxer:%d, originating_pad:%" + GST_PTR_FORMAT, info->use_queue2, info->use_downloadbuffer, + info->demuxer != NULL, originating_pad); slot = g_new0 (OutputSlotInfo, 1); slot->linked_info = info; - /* If buffering is required, create the element */ - if (!no_buffering) { - if (do_download) + /* If a demuxer/parsebin is present, then the downloadbuffer will have been handled before that */ + use_downloadbuffer = info->use_downloadbuffer && !info->demuxer; + + /* If parsebin is used and buffering is required, we go through a multiqueue */ + if (urisrc->parse_streams && (info->use_queue2 || info->use_downloadbuffer)) { + GST_DEBUG_OBJECT (urisrc, "Using multiqueue"); + if (!info->multiqueue) { + GST_DEBUG_OBJECT (urisrc, + "Creating multiqueue for buffering elementary streams"); + elem_name = "multiqueue"; + info->multiqueue = gst_element_factory_make (elem_name, NULL); + if (!info->multiqueue) + goto no_buffer_element; + setup_multiqueue (urisrc, info, info->multiqueue); + } + + slot->queue_sinkpad = + gst_element_request_pad_simple (info->multiqueue, "sink_%u"); + srcpad = gst_pad_get_single_internal_link (slot->queue_sinkpad); + slot->output_pad = create_output_pad (slot, srcpad); + gst_object_unref (srcpad); + gst_pad_link (originating_pad, slot->queue_sinkpad); + } + /* If buffering is required, create the element. If downloadbuffer is + * required, it will take precedence over queue2 */ + else if (use_downloadbuffer || info->use_queue2) { + if (use_downloadbuffer) elem_name = "downloadbuffer"; else elem_name = "queue2"; @@ -1003,40 +1130,23 @@ new_output_slot (ChildSrcPadInfo * info, gboolean do_download, g_signal_connect (G_OBJECT (queue), "notify::bitrate", (GCallback) on_queue_bitrate_changed, urisrc); - if (do_download) { - gchar *temp_template, *filename; - const gchar *tmp_dir, *prgname; - - tmp_dir = g_get_user_cache_dir (); - prgname = g_get_prgname (); - if (prgname == NULL) - prgname = "GStreamer"; - - filename = g_strdup_printf ("%s-XXXXXX", prgname); - - /* build our filename */ - temp_template = g_build_filename (tmp_dir, filename, NULL); - - GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)", - temp_template, tmp_dir, prgname, filename); - - /* configure progressive download for selected media types */ - g_object_set (queue, "temp-template", temp_template, NULL); - - g_free (filename); - g_free (temp_template); + if (use_downloadbuffer) { + setup_downloadbuffer (urisrc, slot->queue); } else { - if (is_adaptive) { - GST_LOG_OBJECT (urisrc, "Adding queue2 for adaptive streaming stream"); - g_object_set (queue, "use-buffering", urisrc->use_buffering, - "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL); + g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL); + if (info->demuxer) { + /* If a adaptive demuxer or parsebin is used, use more accurate information */ + g_object_set (queue, "use-tags-bitrate", TRUE, "use-rate-estimate", + FALSE, NULL); } else { - GST_LOG_OBJECT (urisrc, "Adding queue for buffering"); - g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL); + GST_DEBUG_OBJECT (queue, + "Setting ring-buffer-max-size %" G_GUINT64_FORMAT, + urisrc->ring_buffer_max_size); + /* Else allow ring-buffer-max-size setting to be used */ + g_object_set (queue, "ring-buffer-max-size", + urisrc->ring_buffer_max_size, NULL); } - g_object_set (queue, "ring-buffer-max-size", - urisrc->ring_buffer_max_size, NULL); /* Disable max-size-buffers - queue based on data rate to the default time limit */ g_object_set (queue, "max-size-buffers", 0, NULL); @@ -1075,8 +1185,9 @@ new_output_slot (ChildSrcPadInfo * info, gboolean do_download, /* save output slot so we can remove it later */ info->outputs = g_list_append (info->outputs, slot); - GST_DEBUG_OBJECT (urisrc, "New slot for output_pad %" GST_PTR_FORMAT, - slot->output_pad); + GST_DEBUG_OBJECT (urisrc, + "New output_pad %" GST_PTR_FORMAT " for originating pad %" GST_PTR_FORMAT, + slot->output_pad, originating_pad); return slot; @@ -1550,11 +1661,13 @@ analyse_pad_foreach (const GValue * item, AnalyseData * data) OutputSlotInfo *slot; GST_URI_SOURCE_BIN_LOCK (urisrc); - /* Only use buffering on raw pads in very specific conditions */ + /* Only use buffering (via queue2) on raw pads in very specific + * conditions */ + info->use_queue2 = urisrc->use_buffering && IS_QUEUE_URI (urisrc->uri); + GST_DEBUG_OBJECT (urisrc, "use_buffering:%d is_queue:%d", urisrc->use_buffering, IS_QUEUE_URI (urisrc->uri)); - slot = new_output_slot (info, FALSE, FALSE, !urisrc->use_buffering - || !IS_QUEUE_URI (urisrc->uri), pad); + slot = new_output_slot (info, pad); if (!slot) { res = FALSE; @@ -1732,6 +1845,99 @@ no_demuxer: } } +static gboolean +setup_parsebin_for_slot (ChildSrcPadInfo * info, GstPad * originating_pad) +{ + GstURISourceBin *urisrc = info->urisrc; + GstPad *sinkpad; + GstPadLinkReturn link_res; + + GST_DEBUG_OBJECT (urisrc, "Setting up parsebin for %" GST_PTR_FORMAT, + originating_pad); + + GST_STATE_LOCK (urisrc); + GST_URI_SOURCE_BIN_LOCK (urisrc); + + /* Set up optional pre-parsebin download/ringbuffer elements */ + if (info->use_downloadbuffer || urisrc->ring_buffer_max_size) { + if (info->use_downloadbuffer) { + GST_DEBUG_OBJECT (urisrc, "Setting up pre-parsebin downloadbuffer"); + info->pre_parse_queue = gst_element_factory_make ("downloadbuffer", NULL); + setup_downloadbuffer (urisrc, info->pre_parse_queue); + g_object_set (info->pre_parse_queue, "max-size-bytes", + GET_BUFFER_SIZE (urisrc), "max-size-time", + (guint64) GET_BUFFER_DURATION (urisrc), NULL); + } else if (urisrc->ring_buffer_max_size) { + /* If a ring-buffer-max-size is specified with parsebin, we set it up on + * the queue2 *before* parsebin. We will use its buffering levels instead + * of the ones from multiqueue */ + GST_DEBUG_OBJECT (urisrc, + "Setting up pre-parsebin queue2 for ring-buffer-max-size %" + G_GUINT64_FORMAT, urisrc->ring_buffer_max_size); + info->pre_parse_queue = gst_element_factory_make ("queue2", NULL); + /* We do not use this queue2 for buffering levels, but the multiqueue */ + g_object_set (info->pre_parse_queue, "use-buffering", FALSE, + "ring-buffer-max-size", urisrc->ring_buffer_max_size, + "max-size-buffers", 0, NULL); + } + gst_element_set_locked_state (info->pre_parse_queue, TRUE); + gst_bin_add (GST_BIN_CAST (urisrc), info->pre_parse_queue); + sinkpad = gst_element_get_static_pad (info->pre_parse_queue, "sink"); + link_res = gst_pad_link (originating_pad, sinkpad); + + gst_object_unref (sinkpad); + if (link_res != GST_PAD_LINK_OK) + goto could_not_link; + } + + info->demuxer = gst_element_factory_make ("parsebin", NULL); + if (!info->demuxer) { + post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), "parsebin"); + return FALSE; + } + gst_element_set_locked_state (info->demuxer, TRUE); + gst_bin_add (GST_BIN_CAST (urisrc), info->demuxer); + + if (info->pre_parse_queue) { + if (!gst_element_link_pads (info->pre_parse_queue, "src", info->demuxer, + "sink")) + goto could_not_link; + } else { + sinkpad = gst_element_get_static_pad (info->demuxer, "sink"); + + link_res = gst_pad_link (originating_pad, sinkpad); + + gst_object_unref (sinkpad); + if (link_res != GST_PAD_LINK_OK) + goto could_not_link; + } + + /* set up callbacks to create the links between parsebin and output */ + g_signal_connect (info->demuxer, + "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), info); + g_signal_connect (info->demuxer, + "pad-removed", G_CALLBACK (demuxer_pad_removed_cb), info); + + if (info->pre_parse_queue) { + gst_element_set_locked_state (info->pre_parse_queue, FALSE); + gst_element_sync_state_with_parent (info->pre_parse_queue); + } + gst_element_set_locked_state (info->demuxer, FALSE); + gst_element_sync_state_with_parent (info->demuxer); + GST_URI_SOURCE_BIN_UNLOCK (urisrc); + GST_STATE_UNLOCK (urisrc); + return TRUE; + +could_not_link: + { + GST_URI_SOURCE_BIN_UNLOCK (urisrc); + GST_STATE_UNLOCK (urisrc); + GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION, + (NULL), ("Can't link to (pre-)parsebin element")); + return FALSE; + } +} + /* Called when: * * Source element adds a new pad * * typefind has found a type @@ -1743,7 +1949,6 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps) gboolean is_raw; GstStructure *s; const gchar *media_type; - gboolean do_download = FALSE; GST_URI_SOURCE_BIN_LOCK (urisrc); @@ -1754,7 +1959,7 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps) GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT ", exposing", caps); - slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad); + slot = new_output_slot (info, srcpad); output_pad = gst_object_ref (slot->output_pad); GST_URI_SOURCE_BIN_UNLOCK (urisrc); @@ -1781,7 +1986,8 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps) /* Query the demuxer to see if it can handle buffering */ query = gst_query_new_buffering (GST_FORMAT_TIME); - info->demuxer_handles_buffering = gst_element_query (info->demuxer, query); + info->use_queue2 = urisrc->use_buffering + && !gst_element_query (info->demuxer, query); gst_query_unref (query); GST_DEBUG_OBJECT (urisrc, "Demuxer handles buffering : %d", info->demuxer_handles_buffering); @@ -1798,45 +2004,55 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps) gst_element_sync_state_with_parent (info->demuxer); } else if (!urisrc->is_stream) { - OutputSlotInfo *slot; - GstPad *output_pad; + if (urisrc->parse_streams) { + /* GST_URI_SOURCE_BIN_LOCK (urisrc); */ + setup_parsebin_for_slot (info, srcpad); + /* GST_URI_SOURCE_BIN_UNLOCK (urisrc); */ + } else { + /* We don't need buffering here, expose immediately */ + OutputSlotInfo *slot; + GstPad *output_pad; - /* We don't need buffering here, expose immediately */ - GST_URI_SOURCE_BIN_LOCK (urisrc); - slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad); - output_pad = gst_object_ref (slot->output_pad); - GST_URI_SOURCE_BIN_UNLOCK (urisrc); - expose_output_pad (urisrc, output_pad); - gst_object_unref (output_pad); + GST_URI_SOURCE_BIN_LOCK (urisrc); + slot = new_output_slot (info, srcpad); + output_pad = gst_object_ref (slot->output_pad); + GST_URI_SOURCE_BIN_UNLOCK (urisrc); + expose_output_pad (urisrc, output_pad); + gst_object_unref (output_pad); + } } else { - OutputSlotInfo *slot; - GstPad *output_pad; - /* only enable download buffering if the upstream duration is known */ if (urisrc->download) { GstQuery *query = gst_query_new_duration (GST_FORMAT_BYTES); if (gst_pad_query (srcpad, query)) { gint64 dur; gst_query_parse_duration (query, NULL, &dur); - do_download = (dur != -1); + info->use_downloadbuffer = (dur != -1); } gst_query_unref (query); } + info->use_queue2 = urisrc->use_buffering; - GST_DEBUG_OBJECT (urisrc, "check media-type %s, do_download:%d", media_type, - do_download); + if (urisrc->parse_streams) { + /* GST_URI_SOURCE_BIN_LOCK (urisrc); */ + setup_parsebin_for_slot (info, srcpad); + /* GST_URI_SOURCE_BIN_UNLOCK (urisrc); */ + } else { + OutputSlotInfo *slot; + GstPad *output_pad; - GST_URI_SOURCE_BIN_LOCK (urisrc); - slot = new_output_slot (info, do_download, FALSE, FALSE, srcpad); + GST_URI_SOURCE_BIN_LOCK (urisrc); + slot = new_output_slot (info, srcpad); - gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, - pre_queue_event_probe, urisrc, NULL); + gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, + pre_queue_event_probe, urisrc, NULL); - output_pad = gst_object_ref (slot->output_pad); - GST_URI_SOURCE_BIN_UNLOCK (urisrc); + output_pad = gst_object_ref (slot->output_pad); + GST_URI_SOURCE_BIN_UNLOCK (urisrc); - expose_output_pad (urisrc, output_pad); - gst_object_unref (output_pad); + expose_output_pad (urisrc, output_pad); + gst_object_unref (output_pad); + } } return; @@ -1946,8 +2162,12 @@ free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc) gst_element_set_state (slot->queue, GST_STATE_NULL); remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue)); gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue); - - gst_object_unref (slot->queue_sinkpad); + } + if (slot->queue_sinkpad) { + if (slot->linked_info && slot->linked_info->multiqueue) + gst_element_release_request_pad (slot->linked_info->multiqueue, + slot->queue_sinkpad); + gst_object_replace ((GstObject **) & slot->queue_sinkpad, NULL); } if (slot->demuxer_event_probe_id) @@ -2237,7 +2457,6 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg) return; } - g_mutex_lock (&urisrc->buffering_post_lock); /* @@ -2379,6 +2598,20 @@ handle_message (GstBin * bin, GstMessage * msg) } break; } + case GST_MESSAGE_STREAM_COLLECTION: + /* We only want to forward stream collection from the source element *OR* + * from adaptive demuxers. We do not want to forward them from the + * potential parsebins since there might be many and require aggregation + * to be useful/coherent. */ + if (GST_MESSAGE_SRC (msg) != (GstObject *) urisrc->source + && !urisrc->is_adaptive) { + GST_DEBUG_OBJECT (bin, + "Dropping stream-collection from non-adaptive-demuxer %" + GST_PTR_FORMAT, GST_MESSAGE_SRC (msg)); + gst_message_unref (msg); + msg = NULL; + } + break; case GST_MESSAGE_BUFFERING: handle_buffering_message (urisrc, msg); msg = NULL;