urisourcebin: Improve buffering handling

Introduce the option to have the streams be parsed with `parsebin` for
compatible sources (i.e. which are eligible for buffering in the same way as
before this commit).

By parsing the inputs directly, this allows more accurate buffering control:
* Instead of relying on potential bitrate information coming from somewhere
* and *without* being linked downstream

If `parse-streams` is activated and the stream is eligible for buffering, then a
`multiqueue` will be used on the output of `parsebin` in order to handle the
buffering.

API: `parse-streams`
Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2784>
This commit is contained in:
Edward Hervey 2022-11-09 16:44:18 +01:00 committed by GStreamer Marge Bot
parent fa4a70d7fe
commit 412c4cbca3
2 changed files with 324 additions and 79 deletions

View file

@ -11616,6 +11616,18 @@
"type": "gdouble", "type": "gdouble",
"writable": true "writable": true
}, },
"parse-streams": {
"blurb": "Extract the elementary streams of non-raw sources",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "null",
"readable": true,
"type": "gboolean",
"writable": true
},
"ring-buffer-max-size": { "ring-buffer-max-size": {
"blurb": "Max. amount of data in the ring buffer (bytes, 0 = ring buffer disabled)", "blurb": "Max. amount of data in the ring buffer (bytes, 0 = ring buffer disabled)",
"conditionally-available": false, "conditionally-available": false,

View file

@ -102,12 +102,30 @@ struct _ChildSrcPadInfo
/* An optional typefind */ /* An optional typefind */
GstElement *typefind; GstElement *typefind;
/* An optional demuxer */ /* Pre-parsebin buffering elements. Only present is parse-streams and
* downloading *or* ring-buffer-max-size */
GstElement *pre_parse_queue;
/* Post-parsebin multiqueue. Only present if parse-streams and buffering is
* required */
GstElement *multiqueue;
/* An optional demuxer or parsebin */
GstElement *demuxer; GstElement *demuxer;
gboolean demuxer_handles_buffering; gboolean demuxer_handles_buffering;
/* list of output slots */ /* list of output slots */
GList *outputs; GList *outputs;
/* The following fields specify how this output should be handled */
/* use_downloadbuffer : TRUE if the content from the source should be
* downloaded with a downloadbuffer element */
gboolean use_downloadbuffer;
/* use_queue2: TRUE if the contents should be buffered through a queue2
* element */
gboolean use_queue2;
}; };
/* Output Slot: /* Output Slot:
@ -157,6 +175,7 @@ struct _GstURISourceBin
gboolean use_buffering; gboolean use_buffering;
gdouble low_watermark; gdouble low_watermark;
gdouble high_watermark; gdouble high_watermark;
gboolean parse_streams;
GstElement *source; GstElement *source;
@ -217,6 +236,7 @@ enum
#define DEFAULT_RING_BUFFER_MAX_SIZE 0 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
#define DEFAULT_LOW_WATERMARK 0.01 #define DEFAULT_LOW_WATERMARK 0.01
#define DEFAULT_HIGH_WATERMARK 0.60 #define DEFAULT_HIGH_WATERMARK 0.60
#define DEFAULT_PARSE_STREAMS FALSE
#define ACTUAL_DEFAULT_BUFFER_SIZE 10 * 1024 * 1024 /* The value used for byte limits when buffer-size == -1 */ #define ACTUAL_DEFAULT_BUFFER_SIZE 10 * 1024 * 1024 /* The value used for byte limits when buffer-size == -1 */
#define ACTUAL_DEFAULT_BUFFER_DURATION 5 * GST_SECOND /* The value used for time limits when buffer-duration == -1 */ #define ACTUAL_DEFAULT_BUFFER_DURATION 5 * GST_SECOND /* The value used for time limits when buffer-duration == -1 */
@ -239,6 +259,7 @@ enum
PROP_LOW_WATERMARK, PROP_LOW_WATERMARK,
PROP_HIGH_WATERMARK, PROP_HIGH_WATERMARK,
PROP_STATISTICS, PROP_STATISTICS,
PROP_PARSE_STREAMS,
}; };
#define CUSTOM_EOS_QUARK _custom_eos_quark_get () #define CUSTOM_EOS_QUARK _custom_eos_quark_get ()
@ -289,7 +310,6 @@ static void handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad,
static gboolean setup_typefind (ChildSrcPadInfo * info); static gboolean setup_typefind (ChildSrcPadInfo * info);
static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad); static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad);
static OutputSlotInfo *new_output_slot (ChildSrcPadInfo * info, static OutputSlotInfo *new_output_slot (ChildSrcPadInfo * info,
gboolean do_download, gboolean is_adaptive, gboolean no_buffering,
GstPad * originating_pad); GstPad * originating_pad);
static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc); static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc);
static void free_output_slot_async (GstURISourceBin * urisrc, static void free_output_slot_async (GstURISourceBin * urisrc,
@ -423,6 +443,20 @@ gst_uri_source_bin_class_init (GstURISourceBinClass * klass)
"this element", GST_TYPE_STRUCTURE, "this element", GST_TYPE_STRUCTURE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstURISourceBin:parse-streams:
*
* A `parsebin` element will be used on all non-raw streams, and urisourcebin
* will output the elementary streams. Recommended when buffering is used
* since it will provide accurate buffering levels.
*
* Since: 1.22
*/
g_object_class_install_property (gobject_class, PROP_PARSE_STREAMS,
g_param_spec_boolean ("parse-streams", "Parse Streams",
"Extract the elementary streams of non-raw sources",
DEFAULT_PARSE_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/** /**
* GstURISourceBin::drained: * GstURISourceBin::drained:
* *
@ -560,6 +594,9 @@ gst_uri_source_bin_set_property (GObject * object, guint prop_id,
urisrc->high_watermark = g_value_get_double (value); urisrc->high_watermark = g_value_get_double (value);
update_queue_values (urisrc); update_queue_values (urisrc);
break; break;
case PROP_PARSE_STREAMS:
urisrc->parse_streams = g_value_get_boolean (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -616,6 +653,9 @@ gst_uri_source_bin_get_property (GObject * object, guint prop_id,
case PROP_STATISTICS: case PROP_STATISTICS:
g_value_take_boxed (value, get_queue_statistics (urisrc)); g_value_take_boxed (value, get_queue_statistics (urisrc));
break; break;
case PROP_PARSE_STREAMS:
g_value_set_boolean (value, urisrc->parse_streams);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -659,6 +699,19 @@ free_child_src_pad_info (ChildSrcPadInfo * info, GstURISourceBin * urisrc)
g_list_foreach (info->outputs, (GFunc) free_output_slot, urisrc); g_list_foreach (info->outputs, (GFunc) free_output_slot, urisrc);
g_list_free (info->outputs); g_list_free (info->outputs);
if (info->multiqueue) {
GST_DEBUG_OBJECT (urisrc, "Removing multiqueue");
gst_element_set_state (info->multiqueue, GST_STATE_NULL);
remove_buffering_msgs (urisrc, GST_OBJECT_CAST (info->multiqueue));
gst_bin_remove (GST_BIN_CAST (urisrc), info->multiqueue);
}
if (info->pre_parse_queue) {
gst_element_set_state (info->pre_parse_queue, GST_STATE_NULL);
remove_buffering_msgs (urisrc, GST_OBJECT_CAST (info->pre_parse_queue));
gst_bin_remove (GST_BIN_CAST (urisrc), info->pre_parse_queue);
}
g_free (info); g_free (info);
} }
@ -700,17 +753,14 @@ new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
OutputSlotInfo *slot; OutputSlotInfo *slot;
GstPad *output_pad; GstPad *output_pad;
GST_DEBUG_OBJECT (element, "New pad %" GST_PTR_FORMAT, pad);
GST_URI_SOURCE_BIN_LOCK (urisrc); GST_URI_SOURCE_BIN_LOCK (urisrc);
/* If the demuxer handles buffering and is streams-aware, we can expose it /* If the demuxer handles buffering and is streams-aware, we can expose it
as-is directly. We still add an event probe to deal with EOS */ as-is directly. We still add an event probe to deal with EOS */
slot = slot = new_output_slot (info, pad);
new_output_slot (info, FALSE, FALSE, info->demuxer_handles_buffering,
pad);
output_pad = gst_object_ref (slot->output_pad); output_pad = gst_object_ref (slot->output_pad);
GST_DEBUG_OBJECT (element,
"New streams-aware demuxer pad %s:%s , exposing directly",
GST_DEBUG_PAD_NAME (pad));
slot->demuxer_event_probe_id = slot->demuxer_event_probe_id =
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
GST_PAD_PROBE_TYPE_EVENT_FLUSH, (GstPadProbeCallback) demux_pad_events, GST_PAD_PROBE_TYPE_EVENT_FLUSH, (GstPadProbeCallback) demux_pad_events,
@ -968,27 +1018,104 @@ on_queue_bitrate_changed (GstElement * queue, GParamSpec * pspec,
(GstElementCallAsyncFunc) update_queue_values, NULL, NULL); (GstElementCallAsyncFunc) update_queue_values, NULL, NULL);
} }
static void
setup_downloadbuffer (GstURISourceBin * urisrc, GstElement * downloadbuffer)
{
gchar *temp_template, *filename;
const gchar *tmp_dir, *prgname;
tmp_dir = g_get_user_cache_dir ();
prgname = g_get_prgname ();
if (prgname == NULL)
prgname = "GStreamer";
filename = g_strdup_printf ("%s-XXXXXX", prgname);
/* build our filename */
temp_template = g_build_filename (tmp_dir, filename, NULL);
GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
temp_template, tmp_dir, prgname, filename);
/* configure progressive download for selected media types */
g_object_set (downloadbuffer, "temp-template", temp_template, NULL);
g_free (filename);
g_free (temp_template);
}
static void
setup_multiqueue (GstURISourceBin * urisrc, ChildSrcPadInfo * info,
GstElement * multiqueue)
{
if (info->use_downloadbuffer) {
/* If we have a downloadbuffer we will let that one deal with buffering,
and we only use multiqueue for dealing with interleave */
g_object_set (info->multiqueue, "use-buffering", FALSE, NULL);
} else {
/* Else we set the minimum interleave time of multiqueue to the required
* buffering duration and ask it to report buffering */
g_object_set (info->multiqueue, "use-buffering", TRUE,
"min-interleave-time", GET_BUFFER_DURATION (urisrc), NULL);
}
/* Common properties */
g_object_set (info->multiqueue,
"sync-by-running-time", TRUE,
"use-interleave", TRUE,
"max-size-bytes", 0,
"max-size-buffers", 0,
"low-watermark", urisrc->low_watermark,
"high-watermark", urisrc->high_watermark, NULL);
gst_bin_add (GST_BIN_CAST (urisrc), info->multiqueue);
gst_element_sync_state_with_parent (info->multiqueue);
}
/* Called with lock held */ /* Called with lock held */
static OutputSlotInfo * static OutputSlotInfo *
new_output_slot (ChildSrcPadInfo * info, gboolean do_download, new_output_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
gboolean is_adaptive, gboolean no_buffering, GstPad * originating_pad)
{ {
GstURISourceBin *urisrc = info->urisrc; GstURISourceBin *urisrc = info->urisrc;
OutputSlotInfo *slot; OutputSlotInfo *slot;
GstPad *srcpad; GstPad *srcpad;
GstElement *queue = NULL; GstElement *queue = NULL;
const gchar *elem_name; const gchar *elem_name;
gboolean use_downloadbuffer;
GST_DEBUG_OBJECT (urisrc, GST_DEBUG_OBJECT (urisrc,
"do_download:%d is_adaptive:%d, no_buffering:%d, originating_pad:%" "use_queue2:%d use_downloadbuffer:%d, demuxer:%d, originating_pad:%"
GST_PTR_FORMAT, do_download, is_adaptive, no_buffering, originating_pad); GST_PTR_FORMAT, info->use_queue2, info->use_downloadbuffer,
info->demuxer != NULL, originating_pad);
slot = g_new0 (OutputSlotInfo, 1); slot = g_new0 (OutputSlotInfo, 1);
slot->linked_info = info; slot->linked_info = info;
/* If buffering is required, create the element */ /* If a demuxer/parsebin is present, then the downloadbuffer will have been handled before that */
if (!no_buffering) { use_downloadbuffer = info->use_downloadbuffer && !info->demuxer;
if (do_download)
/* If parsebin is used and buffering is required, we go through a multiqueue */
if (urisrc->parse_streams && (info->use_queue2 || info->use_downloadbuffer)) {
GST_DEBUG_OBJECT (urisrc, "Using multiqueue");
if (!info->multiqueue) {
GST_DEBUG_OBJECT (urisrc,
"Creating multiqueue for buffering elementary streams");
elem_name = "multiqueue";
info->multiqueue = gst_element_factory_make (elem_name, NULL);
if (!info->multiqueue)
goto no_buffer_element;
setup_multiqueue (urisrc, info, info->multiqueue);
}
slot->queue_sinkpad =
gst_element_request_pad_simple (info->multiqueue, "sink_%u");
srcpad = gst_pad_get_single_internal_link (slot->queue_sinkpad);
slot->output_pad = create_output_pad (slot, srcpad);
gst_object_unref (srcpad);
gst_pad_link (originating_pad, slot->queue_sinkpad);
}
/* If buffering is required, create the element. If downloadbuffer is
* required, it will take precedence over queue2 */
else if (use_downloadbuffer || info->use_queue2) {
if (use_downloadbuffer)
elem_name = "downloadbuffer"; elem_name = "downloadbuffer";
else else
elem_name = "queue2"; elem_name = "queue2";
@ -1003,40 +1130,23 @@ new_output_slot (ChildSrcPadInfo * info, gboolean do_download,
g_signal_connect (G_OBJECT (queue), "notify::bitrate", g_signal_connect (G_OBJECT (queue), "notify::bitrate",
(GCallback) on_queue_bitrate_changed, urisrc); (GCallback) on_queue_bitrate_changed, urisrc);
if (do_download) { if (use_downloadbuffer) {
gchar *temp_template, *filename; setup_downloadbuffer (urisrc, slot->queue);
const gchar *tmp_dir, *prgname;
tmp_dir = g_get_user_cache_dir ();
prgname = g_get_prgname ();
if (prgname == NULL)
prgname = "GStreamer";
filename = g_strdup_printf ("%s-XXXXXX", prgname);
/* build our filename */
temp_template = g_build_filename (tmp_dir, filename, NULL);
GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
temp_template, tmp_dir, prgname, filename);
/* configure progressive download for selected media types */
g_object_set (queue, "temp-template", temp_template, NULL);
g_free (filename);
g_free (temp_template);
} else { } else {
if (is_adaptive) { g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
GST_LOG_OBJECT (urisrc, "Adding queue2 for adaptive streaming stream"); if (info->demuxer) {
g_object_set (queue, "use-buffering", urisrc->use_buffering, /* If a adaptive demuxer or parsebin is used, use more accurate information */
"use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL); g_object_set (queue, "use-tags-bitrate", TRUE, "use-rate-estimate",
FALSE, NULL);
} else { } else {
GST_LOG_OBJECT (urisrc, "Adding queue for buffering"); GST_DEBUG_OBJECT (queue,
g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL); "Setting ring-buffer-max-size %" G_GUINT64_FORMAT,
urisrc->ring_buffer_max_size);
/* Else allow ring-buffer-max-size setting to be used */
g_object_set (queue, "ring-buffer-max-size",
urisrc->ring_buffer_max_size, NULL);
} }
g_object_set (queue, "ring-buffer-max-size",
urisrc->ring_buffer_max_size, NULL);
/* Disable max-size-buffers - queue based on data rate to the default time limit */ /* Disable max-size-buffers - queue based on data rate to the default time limit */
g_object_set (queue, "max-size-buffers", 0, NULL); g_object_set (queue, "max-size-buffers", 0, NULL);
@ -1075,8 +1185,9 @@ new_output_slot (ChildSrcPadInfo * info, gboolean do_download,
/* save output slot so we can remove it later */ /* save output slot so we can remove it later */
info->outputs = g_list_append (info->outputs, slot); info->outputs = g_list_append (info->outputs, slot);
GST_DEBUG_OBJECT (urisrc, "New slot for output_pad %" GST_PTR_FORMAT, GST_DEBUG_OBJECT (urisrc,
slot->output_pad); "New output_pad %" GST_PTR_FORMAT " for originating pad %" GST_PTR_FORMAT,
slot->output_pad, originating_pad);
return slot; return slot;
@ -1550,11 +1661,13 @@ analyse_pad_foreach (const GValue * item, AnalyseData * data)
OutputSlotInfo *slot; OutputSlotInfo *slot;
GST_URI_SOURCE_BIN_LOCK (urisrc); GST_URI_SOURCE_BIN_LOCK (urisrc);
/* Only use buffering on raw pads in very specific conditions */ /* Only use buffering (via queue2) on raw pads in very specific
* conditions */
info->use_queue2 = urisrc->use_buffering && IS_QUEUE_URI (urisrc->uri);
GST_DEBUG_OBJECT (urisrc, "use_buffering:%d is_queue:%d", GST_DEBUG_OBJECT (urisrc, "use_buffering:%d is_queue:%d",
urisrc->use_buffering, IS_QUEUE_URI (urisrc->uri)); urisrc->use_buffering, IS_QUEUE_URI (urisrc->uri));
slot = new_output_slot (info, FALSE, FALSE, !urisrc->use_buffering slot = new_output_slot (info, pad);
|| !IS_QUEUE_URI (urisrc->uri), pad);
if (!slot) { if (!slot) {
res = FALSE; res = FALSE;
@ -1732,6 +1845,99 @@ no_demuxer:
} }
} }
static gboolean
setup_parsebin_for_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
{
GstURISourceBin *urisrc = info->urisrc;
GstPad *sinkpad;
GstPadLinkReturn link_res;
GST_DEBUG_OBJECT (urisrc, "Setting up parsebin for %" GST_PTR_FORMAT,
originating_pad);
GST_STATE_LOCK (urisrc);
GST_URI_SOURCE_BIN_LOCK (urisrc);
/* Set up optional pre-parsebin download/ringbuffer elements */
if (info->use_downloadbuffer || urisrc->ring_buffer_max_size) {
if (info->use_downloadbuffer) {
GST_DEBUG_OBJECT (urisrc, "Setting up pre-parsebin downloadbuffer");
info->pre_parse_queue = gst_element_factory_make ("downloadbuffer", NULL);
setup_downloadbuffer (urisrc, info->pre_parse_queue);
g_object_set (info->pre_parse_queue, "max-size-bytes",
GET_BUFFER_SIZE (urisrc), "max-size-time",
(guint64) GET_BUFFER_DURATION (urisrc), NULL);
} else if (urisrc->ring_buffer_max_size) {
/* If a ring-buffer-max-size is specified with parsebin, we set it up on
* the queue2 *before* parsebin. We will use its buffering levels instead
* of the ones from multiqueue */
GST_DEBUG_OBJECT (urisrc,
"Setting up pre-parsebin queue2 for ring-buffer-max-size %"
G_GUINT64_FORMAT, urisrc->ring_buffer_max_size);
info->pre_parse_queue = gst_element_factory_make ("queue2", NULL);
/* We do not use this queue2 for buffering levels, but the multiqueue */
g_object_set (info->pre_parse_queue, "use-buffering", FALSE,
"ring-buffer-max-size", urisrc->ring_buffer_max_size,
"max-size-buffers", 0, NULL);
}
gst_element_set_locked_state (info->pre_parse_queue, TRUE);
gst_bin_add (GST_BIN_CAST (urisrc), info->pre_parse_queue);
sinkpad = gst_element_get_static_pad (info->pre_parse_queue, "sink");
link_res = gst_pad_link (originating_pad, sinkpad);
gst_object_unref (sinkpad);
if (link_res != GST_PAD_LINK_OK)
goto could_not_link;
}
info->demuxer = gst_element_factory_make ("parsebin", NULL);
if (!info->demuxer) {
post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), "parsebin");
return FALSE;
}
gst_element_set_locked_state (info->demuxer, TRUE);
gst_bin_add (GST_BIN_CAST (urisrc), info->demuxer);
if (info->pre_parse_queue) {
if (!gst_element_link_pads (info->pre_parse_queue, "src", info->demuxer,
"sink"))
goto could_not_link;
} else {
sinkpad = gst_element_get_static_pad (info->demuxer, "sink");
link_res = gst_pad_link (originating_pad, sinkpad);
gst_object_unref (sinkpad);
if (link_res != GST_PAD_LINK_OK)
goto could_not_link;
}
/* set up callbacks to create the links between parsebin and output */
g_signal_connect (info->demuxer,
"pad-added", G_CALLBACK (new_demuxer_pad_added_cb), info);
g_signal_connect (info->demuxer,
"pad-removed", G_CALLBACK (demuxer_pad_removed_cb), info);
if (info->pre_parse_queue) {
gst_element_set_locked_state (info->pre_parse_queue, FALSE);
gst_element_sync_state_with_parent (info->pre_parse_queue);
}
gst_element_set_locked_state (info->demuxer, FALSE);
gst_element_sync_state_with_parent (info->demuxer);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
GST_STATE_UNLOCK (urisrc);
return TRUE;
could_not_link:
{
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
GST_STATE_UNLOCK (urisrc);
GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
(NULL), ("Can't link to (pre-)parsebin element"));
return FALSE;
}
}
/* Called when: /* Called when:
* * Source element adds a new pad * * Source element adds a new pad
* * typefind has found a type * * typefind has found a type
@ -1743,7 +1949,6 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
gboolean is_raw; gboolean is_raw;
GstStructure *s; GstStructure *s;
const gchar *media_type; const gchar *media_type;
gboolean do_download = FALSE;
GST_URI_SOURCE_BIN_LOCK (urisrc); GST_URI_SOURCE_BIN_LOCK (urisrc);
@ -1754,7 +1959,7 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT
", exposing", caps); ", exposing", caps);
slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad); slot = new_output_slot (info, srcpad);
output_pad = gst_object_ref (slot->output_pad); output_pad = gst_object_ref (slot->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc); GST_URI_SOURCE_BIN_UNLOCK (urisrc);
@ -1781,7 +1986,8 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
/* Query the demuxer to see if it can handle buffering */ /* Query the demuxer to see if it can handle buffering */
query = gst_query_new_buffering (GST_FORMAT_TIME); query = gst_query_new_buffering (GST_FORMAT_TIME);
info->demuxer_handles_buffering = gst_element_query (info->demuxer, query); info->use_queue2 = urisrc->use_buffering
&& !gst_element_query (info->demuxer, query);
gst_query_unref (query); gst_query_unref (query);
GST_DEBUG_OBJECT (urisrc, "Demuxer handles buffering : %d", GST_DEBUG_OBJECT (urisrc, "Demuxer handles buffering : %d",
info->demuxer_handles_buffering); info->demuxer_handles_buffering);
@ -1798,45 +2004,55 @@ handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
gst_element_sync_state_with_parent (info->demuxer); gst_element_sync_state_with_parent (info->demuxer);
} else if (!urisrc->is_stream) { } else if (!urisrc->is_stream) {
OutputSlotInfo *slot; if (urisrc->parse_streams) {
GstPad *output_pad; /* GST_URI_SOURCE_BIN_LOCK (urisrc); */
setup_parsebin_for_slot (info, srcpad);
/* GST_URI_SOURCE_BIN_UNLOCK (urisrc); */
} else {
/* We don't need buffering here, expose immediately */
OutputSlotInfo *slot;
GstPad *output_pad;
/* We don't need buffering here, expose immediately */ GST_URI_SOURCE_BIN_LOCK (urisrc);
GST_URI_SOURCE_BIN_LOCK (urisrc); slot = new_output_slot (info, srcpad);
slot = new_output_slot (info, FALSE, FALSE, TRUE, srcpad); output_pad = gst_object_ref (slot->output_pad);
output_pad = gst_object_ref (slot->output_pad); GST_URI_SOURCE_BIN_UNLOCK (urisrc);
GST_URI_SOURCE_BIN_UNLOCK (urisrc); expose_output_pad (urisrc, output_pad);
expose_output_pad (urisrc, output_pad); gst_object_unref (output_pad);
gst_object_unref (output_pad); }
} else { } else {
OutputSlotInfo *slot;
GstPad *output_pad;
/* only enable download buffering if the upstream duration is known */ /* only enable download buffering if the upstream duration is known */
if (urisrc->download) { if (urisrc->download) {
GstQuery *query = gst_query_new_duration (GST_FORMAT_BYTES); GstQuery *query = gst_query_new_duration (GST_FORMAT_BYTES);
if (gst_pad_query (srcpad, query)) { if (gst_pad_query (srcpad, query)) {
gint64 dur; gint64 dur;
gst_query_parse_duration (query, NULL, &dur); gst_query_parse_duration (query, NULL, &dur);
do_download = (dur != -1); info->use_downloadbuffer = (dur != -1);
} }
gst_query_unref (query); gst_query_unref (query);
} }
info->use_queue2 = urisrc->use_buffering;
GST_DEBUG_OBJECT (urisrc, "check media-type %s, do_download:%d", media_type, if (urisrc->parse_streams) {
do_download); /* GST_URI_SOURCE_BIN_LOCK (urisrc); */
setup_parsebin_for_slot (info, srcpad);
/* GST_URI_SOURCE_BIN_UNLOCK (urisrc); */
} else {
OutputSlotInfo *slot;
GstPad *output_pad;
GST_URI_SOURCE_BIN_LOCK (urisrc); GST_URI_SOURCE_BIN_LOCK (urisrc);
slot = new_output_slot (info, do_download, FALSE, FALSE, srcpad); slot = new_output_slot (info, srcpad);
gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
pre_queue_event_probe, urisrc, NULL); pre_queue_event_probe, urisrc, NULL);
output_pad = gst_object_ref (slot->output_pad); output_pad = gst_object_ref (slot->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc); GST_URI_SOURCE_BIN_UNLOCK (urisrc);
expose_output_pad (urisrc, output_pad); expose_output_pad (urisrc, output_pad);
gst_object_unref (output_pad); gst_object_unref (output_pad);
}
} }
return; return;
@ -1946,8 +2162,12 @@ free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc)
gst_element_set_state (slot->queue, GST_STATE_NULL); gst_element_set_state (slot->queue, GST_STATE_NULL);
remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue)); remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue); gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue);
}
gst_object_unref (slot->queue_sinkpad); if (slot->queue_sinkpad) {
if (slot->linked_info && slot->linked_info->multiqueue)
gst_element_release_request_pad (slot->linked_info->multiqueue,
slot->queue_sinkpad);
gst_object_replace ((GstObject **) & slot->queue_sinkpad, NULL);
} }
if (slot->demuxer_event_probe_id) if (slot->demuxer_event_probe_id)
@ -2237,7 +2457,6 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
return; return;
} }
g_mutex_lock (&urisrc->buffering_post_lock); g_mutex_lock (&urisrc->buffering_post_lock);
/* /*
@ -2379,6 +2598,20 @@ handle_message (GstBin * bin, GstMessage * msg)
} }
break; break;
} }
case GST_MESSAGE_STREAM_COLLECTION:
/* We only want to forward stream collection from the source element *OR*
* from adaptive demuxers. We do not want to forward them from the
* potential parsebins since there might be many and require aggregation
* to be useful/coherent. */
if (GST_MESSAGE_SRC (msg) != (GstObject *) urisrc->source
&& !urisrc->is_adaptive) {
GST_DEBUG_OBJECT (bin,
"Dropping stream-collection from non-adaptive-demuxer %"
GST_PTR_FORMAT, GST_MESSAGE_SRC (msg));
gst_message_unref (msg);
msg = NULL;
}
break;
case GST_MESSAGE_BUFFERING: case GST_MESSAGE_BUFFERING:
handle_buffering_message (urisrc, msg); handle_buffering_message (urisrc, msg);
msg = NULL; msg = NULL;