gstreamer/subprojects/gst-plugins-base/gst/playback/gsturisourcebin.c
Edward Hervey f7d6822589 urisourcebin: Avoid deadlock on shutdown
The reason why the STATE lock was taken was to avoid issues where we would be
adding (and activating) elements at the same time as urisourcebin would be
brought down to READY. That would cause those new elements to potentially return
ERRORS because of not-negotiated/flushing-pads

But that creates a really bad deadlock (state lock is taken to deactivate the
streaming thread which .. is currently grabbing the state lock).

Instead, we can just ignore the warning/error messages that might occur when
shutting down.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/4075

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/8162>
2024-12-20 08:01:01 +00:00

3474 lines
108 KiB
C

/* GStreamer
* Copyright (C) <2015> Jan Schmidt <jan@centricular.com>
* Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-urisourcebin
* @title: urisourcebin
*
* urisourcebin is an element for accessing URIs in a uniform manner.
*
* It handles selecting a URI source element and potentially download
* buffering for network sources. It produces one or more source pads,
* depending on the input source, for feeding to decoding chains or decodebin.
*
* The main configuration is via the #GstURISourceBin:uri property.
*
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <string.h>
#include <gst/gst.h>
#include <glib/gi18n-lib.h>
#include <gst/pbutils/missing-plugins.h>
#include "gstplay-enum.h"
#include "gstrawcaps.h"
#include "gstplaybackelements.h"
#include "gstplaybackutils.h"
#define GST_TYPE_URI_SOURCE_BIN \
(gst_uri_source_bin_get_type())
#define GST_URI_SOURCE_BIN(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_URI_SOURCE_BIN,GstURISourceBin))
#define GST_URI_SOURCE_BIN_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_URI_SOURCE_BIN,GstURISourceBinClass))
#define GST_IS_URI_SOURCE_BIN(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_URI_SOURCE_BIN))
#define GST_IS_URI_SOURCE_BIN_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_URI_SOURCE_BIN))
#define GST_URI_SOURCE_BIN_CAST(obj) ((GstURISourceBin *) (obj))
typedef struct _GstURISourceBin GstURISourceBin;
typedef struct _GstURISourceBinClass GstURISourceBinClass;
typedef struct _ChildSrcPadInfo ChildSrcPadInfo;
typedef struct _OutputSlotInfo OutputSlotInfo;
#define GST_URI_SOURCE_BIN_LOCK(urisrc) (g_mutex_lock(&((GstURISourceBin*)(urisrc))->lock))
#define GST_URI_SOURCE_BIN_UNLOCK(urisrc) (g_mutex_unlock(&((GstURISourceBin*)(urisrc))->lock))
#define BUFFERING_LOCK(ubin) G_STMT_START { \
GST_LOG_OBJECT (ubin, \
"buffering locking from thread %p", \
g_thread_self ()); \
g_mutex_lock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock); \
GST_LOG_OBJECT (ubin, \
"buffering lock from thread %p", \
g_thread_self ()); \
} G_STMT_END
#define BUFFERING_UNLOCK(ubin) G_STMT_START { \
GST_LOG_OBJECT (ubin, \
"buffering unlocking from thread %p", \
g_thread_self ()); \
g_mutex_unlock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock); \
} G_STMT_END
/* Track a source pad from the source element and the chain of (optional)
* elements that are linked to it up to the output slots */
struct _ChildSrcPadInfo
{
GstURISourceBin *urisrc;
/* Source pad this info is attached to (reffed) */
GstPad *src_pad;
/* An optional typefind */
GstElement *typefind;
/* Pre-parsebin buffering elements. Only present is parse-streams and
* downloading *or* ring-buffer-max-size */
GstElement *pre_parse_queue;
/* Post-parsebin multiqueue. Only present if parse-streams and buffering is
* required */
GstElement *multiqueue;
/* An optional demuxer or parsebin */
GstElement *demuxer;
gboolean demuxer_handles_buffering;
gboolean demuxer_streams_aware;
gboolean demuxer_is_parsebin;
/* list of output slots */
GList *outputs;
/* The following fields specify how this output should be handled */
/* use_downloadbuffer : TRUE if the content from the source should be
* downloaded with a downloadbuffer element */
gboolean use_downloadbuffer;
/* use_queue2: TRUE if the contents should be buffered through a queue2
* element */
gboolean use_queue2;
/* Current StreamCollection */
GstStreamCollection *collection;
};
/* Output Slot:
*
* Handles everything related to outputing, including optional buffering.
*/
struct _OutputSlotInfo
{
ChildSrcPadInfo *linked_info; /* source pad info feeding this slot */
GstStream *stream; /* The current stream */
GstStream *pending_stream; /* The stream this slot should switch to */
GstPad *originating_pad; /* Pad that created this OutputSlotInfo (ref held) */
GstPad *pending_pad; /* Pad this slot should use once originating_pad goes away (ref held) */
GstPad *output_pad; /* Output ghost pad */
gboolean is_eos; /* Did EOS get fed into the buffering element */
GstElement *queue; /* queue2 or downloadbuffer */
GstPad *queue_sinkpad; /* Sink pad of the queue eleemnt */
gulong bitrate_changed_id; /* queue bitrate changed notification */
guint demuxer_event_probe_id;
guint pending_probe_id; /* demuxer_event_probe_id for pending_pad */
};
/**
* GstURISourceBin
*
* urisourcebin element struct
*/
struct _GstURISourceBin
{
GstBin parent_instance;
GMutex lock; /* lock for constructing */
gchar *uri;
guint64 connection_speed;
gboolean activated; /* TRUE if the switch to PAUSED has been completed */
gint flushing; /* TRUE if switching from PAUSED to READY (atomic int) */
GCond activation_cond; /* Uses the urisourcebin lock */
gboolean is_stream;
gboolean is_adaptive;
guint64 buffer_duration; /* When buffering, buffer duration (ns) */
guint buffer_size; /* When buffering, buffer size (bytes) */
gboolean download;
gchar *download_dir;
gboolean use_buffering;
gdouble low_watermark;
gdouble high_watermark;
gboolean parse_streams;
GstElement *source;
GList *src_infos; /* List of ChildSrcPadInfo for the source */
guint numpads;
/* for dynamic sources */
guint src_np_sig_id; /* new-pad signal id */
guint64 ring_buffer_max_size; /* 0 means disabled */
GList *buffering_status; /* element currently buffering messages */
gint last_buffering_pct; /* Avoid sending buffering over and over */
GMutex buffering_lock;
GMutex buffering_post_lock;
/* Current output collection (if provided by internal elements) */
GstStreamCollection *collection;
};
struct _GstURISourceBinClass
{
GstBinClass parent_class;
/* emitted when all data has been drained out
* FIXME : What do we need this for ?? */
void (*drained) (GstElement * element);
/* emitted when all data has been fed into buffering slots (i.e the
* actual sources are done) */
void (*about_to_finish) (GstElement * element);
};
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS_ANY);
static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
GST_DEBUG_CATEGORY_STATIC (gst_uri_source_bin_debug);
#define GST_CAT_DEFAULT gst_uri_source_bin_debug
/* signals */
enum
{
SIGNAL_DRAINED,
SIGNAL_ABOUT_TO_FINISH,
SIGNAL_SOURCE_SETUP,
LAST_SIGNAL
};
/* properties */
#define DEFAULT_PROP_URI NULL
#define DEFAULT_PROP_SOURCE NULL
#define DEFAULT_CONNECTION_SPEED 0
#define DEFAULT_BUFFER_DURATION -1
#define DEFAULT_BUFFER_SIZE -1
#define DEFAULT_DOWNLOAD FALSE
#define DEFAULT_USE_BUFFERING TRUE
#define DEFAULT_RING_BUFFER_MAX_SIZE 0
#define DEFAULT_LOW_WATERMARK 0.01
#define DEFAULT_HIGH_WATERMARK 0.60
#define DEFAULT_PARSE_STREAMS FALSE
#define ACTUAL_DEFAULT_BUFFER_SIZE 10 * 1024 * 1024 /* The value used for byte limits when buffer-size == -1 */
#define ACTUAL_DEFAULT_BUFFER_DURATION 5 * GST_SECOND /* The value used for time limits when buffer-duration == -1 */
#define GET_BUFFER_SIZE(u) ((u)->buffer_size == -1 ? ACTUAL_DEFAULT_BUFFER_SIZE : (u)->buffer_size)
#define GET_BUFFER_DURATION(u) ((u)->buffer_duration == -1 ? ACTUAL_DEFAULT_BUFFER_DURATION : (u)->buffer_duration)
#define DEFAULT_CAPS (gst_static_caps_get (&default_raw_caps))
enum
{
PROP_0,
PROP_URI,
PROP_SOURCE,
PROP_CONNECTION_SPEED,
PROP_BUFFER_SIZE,
PROP_BUFFER_DURATION,
PROP_DOWNLOAD,
PROP_DOWNLOAD_DIR,
PROP_USE_BUFFERING,
PROP_RING_BUFFER_MAX_SIZE,
PROP_LOW_WATERMARK,
PROP_HIGH_WATERMARK,
PROP_STATISTICS,
PROP_PARSE_STREAMS,
};
#define CUSTOM_EOS_QUARK _custom_eos_quark_get ()
#define CUSTOM_EOS_QUARK_DATA "custom-eos"
static GQuark
_custom_eos_quark_get (void)
{
static gsize g_quark;
if (g_once_init_enter (&g_quark)) {
gsize quark =
(gsize) g_quark_from_static_string ("urisourcebin-custom-eos");
g_once_init_leave (&g_quark, quark);
}
return g_quark;
}
static void post_missing_plugin_error (GstElement * urisrc,
const gchar * element_name);
static guint gst_uri_source_bin_signals[LAST_SIGNAL] = { 0 };
GType gst_uri_source_bin_get_type (void);
#define gst_uri_source_bin_parent_class parent_class
G_DEFINE_TYPE (GstURISourceBin, gst_uri_source_bin, GST_TYPE_BIN);
#define _do_init \
GST_DEBUG_CATEGORY_INIT (gst_uri_source_bin_debug, "urisourcebin", 0, "URI source element"); \
playback_element_init (plugin);
GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (urisourcebin, "urisourcebin",
GST_RANK_NONE, GST_TYPE_URI_SOURCE_BIN, _do_init);
static void gst_uri_source_bin_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_uri_source_bin_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_uri_source_bin_finalize (GObject * obj);
static void handle_message (GstBin * bin, GstMessage * msg);
static gboolean gst_uri_source_bin_query (GstElement * element,
GstQuery * query);
static GstStateChangeReturn gst_uri_source_bin_change_state (GstElement *
element, GstStateChange transition);
static void handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad,
GstCaps * caps);
static gboolean setup_typefind (ChildSrcPadInfo * info);
static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad);
static OutputSlotInfo *new_output_slot (ChildSrcPadInfo * info,
GstPad * originating_pad);
static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc);
static void free_output_slot_async (GstURISourceBin * urisrc,
OutputSlotInfo * slot);
static GstPad *create_output_pad (OutputSlotInfo * slot, GstPad * pad);
static void remove_buffering_msgs (GstURISourceBin * bin, GstObject * src);
static void update_queue_values (GstURISourceBin * urisrc);
static GstStructure *get_queue_statistics (GstURISourceBin * urisrc);
static void
gst_uri_source_bin_class_init (GstURISourceBinClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
GstBinClass *gstbin_class;
gobject_class = G_OBJECT_CLASS (klass);
gstelement_class = GST_ELEMENT_CLASS (klass);
gstbin_class = GST_BIN_CLASS (klass);
gobject_class->set_property = gst_uri_source_bin_set_property;
gobject_class->get_property = gst_uri_source_bin_get_property;
gobject_class->finalize = gst_uri_source_bin_finalize;
g_object_class_install_property (gobject_class, PROP_URI,
g_param_spec_string ("uri", "URI", "URI to decode",
DEFAULT_PROP_URI, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SOURCE,
g_param_spec_object ("source", "Source", "Source object used",
GST_TYPE_ELEMENT, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
g_param_spec_uint64 ("connection-speed", "Connection Speed",
"Network connection speed in kbps (0 = unknown)",
0, G_MAXUINT64 / 1000, DEFAULT_CONNECTION_SPEED,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
g_param_spec_int ("buffer-size", "Buffer size (bytes)",
"Buffer size when buffering streams (-1 default value)",
-1, G_MAXINT, DEFAULT_BUFFER_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BUFFER_DURATION,
g_param_spec_int64 ("buffer-duration", "Buffer duration (ns)",
"Buffer duration when buffering streams (-1 default value)",
-1, G_MAXINT64, DEFAULT_BUFFER_DURATION,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstURISourceBin::download:
*
* For certain media type, enable download buffering.
*/
g_object_class_install_property (gobject_class, PROP_DOWNLOAD,
g_param_spec_boolean ("download", "Download",
"Attempt download buffering when buffering network streams",
DEFAULT_DOWNLOAD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstURISourceBin:download-dir:
*
* The directory where buffers are downloaded to, if 'download' is enabled.
* If not set (default), the XDG cache directory is used.
*
* Since: 1.24
*/
g_object_class_install_property (gobject_class, PROP_DOWNLOAD_DIR,
g_param_spec_string ("download-dir", "Download Directory",
"The directory where buffers are downloaded to, if 'download' is enabled. "
"If not set (default), the XDG cache directory is used.",
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstURISourceBin::use-buffering:
*
* Perform buffering using a queue2 element, and emit BUFFERING
* messages based on low-/high-percent thresholds of streaming data,
* such as adaptive-demuxer streams.
*
* When download buffering is activated and used for the current media
* type, this property does nothing.
*
*/
g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
g_param_spec_boolean ("use-buffering", "Use Buffering",
"Perform buffering on demuxed/parsed media",
DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstURISourceBin::ring-buffer-max-size
*
* The maximum size of the ring buffer in kilobytes. If set to 0, the ring
* buffer is disabled. Default is 0.
*
*/
g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
g_param_spec_uint64 ("ring-buffer-max-size",
"Max. ring buffer size (bytes)",
"Max. amount of data in the ring buffer (bytes, 0 = ring buffer disabled)",
0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstURISourceBin::low-watermark
*
* Proportion of the queue size (either in bytes or time) for buffering
* to restart when crossed from above. Only used if use-buffering is TRUE.
*/
g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
g_param_spec_double ("low-watermark", "Low watermark",
"Low threshold for buffering to start. Only used if use-buffering is True",
0.0, 1.0, DEFAULT_LOW_WATERMARK,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstURISourceBin::high-watermark
*
* Proportion of the queue size (either in bytes or time) to complete
* buffering. Only used if use-buffering is TRUE.
*/
g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
g_param_spec_double ("high-watermark", "High watermark",
"High threshold for buffering to finish. Only used if use-buffering is True",
0.0, 1.0, DEFAULT_HIGH_WATERMARK,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstURISourceBin::statistics
*
* A GStructure containing the following values based on the values from
* all the queue's contained in this urisourcebin.
*
* "minimum-byte-level" G_TYPE_UINT Minimum of the current byte levels
* "maximum-byte-level" G_TYPE_UINT Maximum of the current byte levels
* "average-byte-level" G_TYPE_UINT Average of the current byte levels
* "minimum-time-level" G_TYPE_UINT64 Minimum of the current time levels
* "maximum-time-level" G_TYPE_UINT64 Maximum of the current time levels
* "average-time-level" G_TYPE_UINT64 Average of the current time levels
*/
g_object_class_install_property (gobject_class, PROP_STATISTICS,
g_param_spec_boxed ("statistics", "Queue Statistics",
"A set of statistics over all the queue-like elements contained in "
"this element", GST_TYPE_STRUCTURE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstURISourceBin:parse-streams:
*
* A `parsebin` element will be used on all non-raw streams, and urisourcebin
* will output the elementary streams. Recommended when buffering is used
* since it will provide accurate buffering levels.
*
* Since: 1.22
*/
g_object_class_install_property (gobject_class, PROP_PARSE_STREAMS,
g_param_spec_boolean ("parse-streams", "Parse Streams",
"Extract the elementary streams of non-raw sources",
DEFAULT_PARSE_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstURISourceBin::drained:
*
* This signal is emitted when the data for the current uri is played.
*/
gst_uri_source_bin_signals[SIGNAL_DRAINED] =
g_signal_new ("drained", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstURISourceBinClass, drained), NULL, NULL, NULL,
G_TYPE_NONE, 0, G_TYPE_NONE);
/**
* GstURISourceBin::about-to-finish:
*
* This signal is emitted when the data for the current uri is played.
*/
gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH] =
g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstURISourceBinClass, about_to_finish), NULL, NULL, NULL,
G_TYPE_NONE, 0, G_TYPE_NONE);
/**
* GstURISourceBin::source-setup:
* @bin: the urisourcebin.
* @source: source element
*
* This signal is emitted after the source element has been created, so
* it can be configured by setting additional properties (e.g. set a
* proxy server for an http source, or set the device and read speed for
* an audio cd source). This is functionally equivalent to connecting to
* the notify::source signal, but more convenient.
*
* Since: 1.6.1
*/
gst_uri_source_bin_signals[SIGNAL_SOURCE_SETUP] =
g_signal_new ("source-setup", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&srctemplate));
gst_element_class_set_static_metadata (gstelement_class,
"URI reader", "Generic/Bin/Source",
"Download and buffer a URI as needed",
"Jan Schmidt <jan@centricular.com>");
gstelement_class->query = GST_DEBUG_FUNCPTR (gst_uri_source_bin_query);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_uri_source_bin_change_state);
gstbin_class->handle_message = GST_DEBUG_FUNCPTR (handle_message);
}
static void
gst_uri_source_bin_init (GstURISourceBin * urisrc)
{
g_mutex_init (&urisrc->lock);
g_mutex_init (&urisrc->buffering_lock);
g_mutex_init (&urisrc->buffering_post_lock);
g_cond_init (&urisrc->activation_cond);
urisrc->uri = g_strdup (DEFAULT_PROP_URI);
urisrc->connection_speed = DEFAULT_CONNECTION_SPEED;
urisrc->buffer_duration = DEFAULT_BUFFER_DURATION;
urisrc->buffer_size = DEFAULT_BUFFER_SIZE;
urisrc->download = DEFAULT_DOWNLOAD;
urisrc->use_buffering = DEFAULT_USE_BUFFERING;
urisrc->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
urisrc->last_buffering_pct = -1;
urisrc->low_watermark = DEFAULT_LOW_WATERMARK;
urisrc->high_watermark = DEFAULT_HIGH_WATERMARK;
GST_OBJECT_FLAG_SET (urisrc,
GST_ELEMENT_FLAG_SOURCE | GST_BIN_FLAG_STREAMS_AWARE);
gst_bin_set_suppressed_flags (GST_BIN (urisrc),
GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
}
static void
gst_uri_source_bin_finalize (GObject * obj)
{
GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (obj);
g_mutex_clear (&urisrc->lock);
g_mutex_clear (&urisrc->buffering_lock);
g_mutex_clear (&urisrc->buffering_post_lock);
g_free (urisrc->uri);
g_free (urisrc->download_dir);
gst_object_replace ((GstObject **) & urisrc->collection, NULL);
G_OBJECT_CLASS (parent_class)->finalize (obj);
}
static void
gst_uri_source_bin_update_connection_speed (GstURISourceBin * urisrc)
{
guint64 speed = 0;
GList *iter;
if (!urisrc->is_adaptive) {
return;
}
GST_OBJECT_LOCK (urisrc);
speed = urisrc->connection_speed / 1000;
GST_OBJECT_UNLOCK (urisrc);
GST_URI_SOURCE_BIN_LOCK (urisrc);
for (iter = urisrc->src_infos; iter; iter = iter->next) {
ChildSrcPadInfo *info = iter->data;
GParamSpec *pspec = NULL;
if (!info->demuxer)
continue;
pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (info->demuxer),
"connection-speed");
if (pspec != NULL)
g_object_set (info->demuxer, "connection-speed", speed, NULL);
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
}
static void
gst_uri_source_bin_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (object);
switch (prop_id) {
case PROP_URI:
GST_OBJECT_LOCK (urisrc);
g_free (urisrc->uri);
urisrc->uri = g_value_dup_string (value);
GST_OBJECT_UNLOCK (urisrc);
break;
case PROP_CONNECTION_SPEED:
GST_OBJECT_LOCK (urisrc);
urisrc->connection_speed = g_value_get_uint64 (value) * 1000;
GST_OBJECT_UNLOCK (urisrc);
gst_uri_source_bin_update_connection_speed (urisrc);
break;
case PROP_BUFFER_SIZE:
urisrc->buffer_size = g_value_get_int (value);
update_queue_values (urisrc);
break;
case PROP_BUFFER_DURATION:
urisrc->buffer_duration = g_value_get_int64 (value);
update_queue_values (urisrc);
break;
case PROP_DOWNLOAD:
urisrc->download = g_value_get_boolean (value);
break;
case PROP_DOWNLOAD_DIR:
g_free (urisrc->download_dir);
urisrc->download_dir = g_value_dup_string (value);
break;
case PROP_USE_BUFFERING:
urisrc->use_buffering = g_value_get_boolean (value);
break;
case PROP_RING_BUFFER_MAX_SIZE:
urisrc->ring_buffer_max_size = g_value_get_uint64 (value);
break;
case PROP_LOW_WATERMARK:
urisrc->low_watermark = g_value_get_double (value);
update_queue_values (urisrc);
break;
case PROP_HIGH_WATERMARK:
urisrc->high_watermark = g_value_get_double (value);
update_queue_values (urisrc);
break;
case PROP_PARSE_STREAMS:
urisrc->parse_streams = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_uri_source_bin_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (object);
switch (prop_id) {
case PROP_URI:
GST_OBJECT_LOCK (urisrc);
g_value_set_string (value, urisrc->uri);
GST_OBJECT_UNLOCK (urisrc);
break;
case PROP_SOURCE:
GST_OBJECT_LOCK (urisrc);
g_value_set_object (value, urisrc->source);
GST_OBJECT_UNLOCK (urisrc);
break;
case PROP_CONNECTION_SPEED:
GST_OBJECT_LOCK (urisrc);
g_value_set_uint64 (value, urisrc->connection_speed / 1000);
GST_OBJECT_UNLOCK (urisrc);
break;
case PROP_BUFFER_SIZE:
GST_OBJECT_LOCK (urisrc);
g_value_set_int (value, urisrc->buffer_size);
GST_OBJECT_UNLOCK (urisrc);
break;
case PROP_BUFFER_DURATION:
GST_OBJECT_LOCK (urisrc);
g_value_set_int64 (value, urisrc->buffer_duration);
GST_OBJECT_UNLOCK (urisrc);
break;
case PROP_DOWNLOAD:
g_value_set_boolean (value, urisrc->download);
break;
case PROP_DOWNLOAD_DIR:
g_value_set_string (value, urisrc->download_dir);
break;
case PROP_USE_BUFFERING:
g_value_set_boolean (value, urisrc->use_buffering);
break;
case PROP_RING_BUFFER_MAX_SIZE:
g_value_set_uint64 (value, urisrc->ring_buffer_max_size);
break;
case PROP_LOW_WATERMARK:
g_value_set_double (value, urisrc->low_watermark);
break;
case PROP_HIGH_WATERMARK:
g_value_set_double (value, urisrc->high_watermark);
break;
case PROP_STATISTICS:
g_value_take_boxed (value, get_queue_statistics (urisrc));
break;
case PROP_PARSE_STREAMS:
g_value_set_boolean (value, urisrc->parse_streams);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static GstEvent *
add_stream_start_custom_flag (GstEvent ** event)
{
GstStructure *s;
/* This is a temporary hack to notify downstream decodebin3 to *not*
* plug in an extra parsebin */
s = (GstStructure *) gst_event_get_structure (*event);
if (!gst_structure_has_field_typed (s, "urisourcebin-parsed-data",
G_TYPE_BOOLEAN)) {
*event = gst_event_make_writable (*event);
s = (GstStructure *) gst_event_get_structure (*event);
gst_structure_set (s, "urisourcebin-parsed-data", G_TYPE_BOOLEAN, TRUE,
NULL);
}
return *event;
}
typedef struct
{
GstPad *target_pad;
gboolean rewrite_stream_start;
} CopyEventData;
static gboolean
copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
{
CopyEventData *data = user_data;
GstPad *gpad = data->target_pad;
if (data->rewrite_stream_start &&
GST_EVENT_TYPE (*event) == GST_EVENT_STREAM_START) {
add_stream_start_custom_flag (event);
}
GST_DEBUG_OBJECT (gpad,
"store sticky event from %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad,
*event);
gst_pad_store_sticky_event (gpad, *event);
return TRUE;
}
static GstPadProbeReturn
demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot);
/* CALL WITH URISOURCEBIN LOCK */
static void
free_child_src_pad_info (ChildSrcPadInfo * info, GstURISourceBin * urisrc)
{
g_assert (info->src_pad);
GST_DEBUG_OBJECT (urisrc,
"Freeing ChildSrcPadInfo for %" GST_PTR_FORMAT, info->src_pad);
if (info->typefind) {
gst_element_set_state (info->typefind, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (urisrc), info->typefind);
}
gst_object_unref (info->src_pad);
if (info->demuxer) {
GST_DEBUG_OBJECT (urisrc, "Removing demuxer");
gst_element_set_state (info->demuxer, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (urisrc), info->demuxer);
}
g_list_foreach (info->outputs, (GFunc) free_output_slot, urisrc);
g_list_free (info->outputs);
if (info->multiqueue) {
GST_DEBUG_OBJECT (urisrc, "Removing multiqueue");
gst_element_set_state (info->multiqueue, GST_STATE_NULL);
remove_buffering_msgs (urisrc, GST_OBJECT_CAST (info->multiqueue));
gst_bin_remove (GST_BIN_CAST (urisrc), info->multiqueue);
}
if (info->pre_parse_queue) {
gst_element_set_state (info->pre_parse_queue, GST_STATE_NULL);
remove_buffering_msgs (urisrc, GST_OBJECT_CAST (info->pre_parse_queue));
gst_bin_remove (GST_BIN_CAST (urisrc), info->pre_parse_queue);
}
gst_object_replace ((GstObject **) & info->collection, NULL);
g_free (info);
}
static ChildSrcPadInfo *
get_cspi_for_pad (GstURISourceBin * urisrc, GstPad * pad)
{
GList *iter;
for (iter = urisrc->src_infos; iter; iter = iter->next) {
ChildSrcPadInfo *info = iter->data;
if (info->src_pad == pad)
return info;
}
return NULL;
}
static ChildSrcPadInfo *
new_child_src_pad_info (GstURISourceBin * urisrc, GstPad * pad)
{
ChildSrcPadInfo *info;
GST_LOG_OBJECT (urisrc, "New ChildSrcPadInfo for %" GST_PTR_FORMAT, pad);
info = g_new0 (ChildSrcPadInfo, 1);
info->urisrc = urisrc;
info->src_pad = gst_object_ref (pad);
urisrc->src_infos = g_list_append (urisrc->src_infos, info);
return info;
}
static OutputSlotInfo *
find_replacement_slot (ChildSrcPadInfo * info, GstStream * stream)
{
GList *iter;
for (iter = info->outputs; iter; iter = iter->next) {
OutputSlotInfo *slot = iter->data;
if (slot->pending_stream == stream)
return slot;
}
return NULL;
}
/* Called by the signal handlers when a demuxer has produced a new stream */
static void
new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
ChildSrcPadInfo * info)
{
GstURISourceBin *urisrc = info->urisrc;
OutputSlotInfo *slot = NULL;
GstPad *output_pad;
GST_DEBUG_OBJECT (element, "New pad %" GST_PTR_FORMAT, pad);
GST_URI_SOURCE_BIN_LOCK (urisrc);
/* Double-check that the demuxer is streams-aware by checking if it posted a
* collection */
if (info->demuxer && !info->demuxer_is_parsebin
&& !info->demuxer_streams_aware) {
GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN, (NULL),
("Adaptive demuxer is not streams-aware, check your installation"));
}
/* For parsebin source pads we want to check if this is a replacement pad for
* which we want to re-use an existing OutputSlotInfo */
if (info->demuxer_is_parsebin) {
GstStream *stream = gst_pad_get_stream (pad);
if (stream) {
slot = find_replacement_slot (info, stream);
gst_object_unref (stream);
if (slot) {
GST_DEBUG_OBJECT (pad, "Can re-use slot %s:%s",
GST_DEBUG_PAD_NAME (slot->originating_pad));
slot->pending_pad = gst_object_ref (pad);
slot->pending_probe_id =
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
GST_PAD_PROBE_TYPE_EVENT_FLUSH,
(GstPadProbeCallback) demux_pad_events, slot, NULL);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
return;
}
GST_DEBUG_OBJECT (pad, "No existing output slot to re-use");
} else {
GST_WARNING_OBJECT (pad, "No GstStream on pad ??");
}
}
/* If the demuxer handles buffering and is streams-aware, we can expose it
as-is directly. We still add an event probe to deal with EOS */
slot = new_output_slot (info, pad);
output_pad = gst_object_ref (slot->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
expose_output_pad (urisrc, output_pad);
gst_object_unref (output_pad);
}
/* Called with lock held */
static gboolean
all_slots_are_eos (GstURISourceBin * urisrc)
{
GList *tmp;
for (tmp = urisrc->src_infos; tmp; tmp = tmp->next) {
ChildSrcPadInfo *cspi = tmp->data;
GList *iter2;
for (iter2 = cspi->outputs; iter2; iter2 = iter2->next) {
OutputSlotInfo *slot = (OutputSlotInfo *) iter2->data;
if (slot->is_eos == FALSE)
return FALSE;
}
}
return TRUE;
}
/* CALL WITH URISOURCEBIN LOCK */
static OutputSlotInfo *
output_slot_for_originating_pad (ChildSrcPadInfo * info,
GstPad * originating_pad)
{
GList *iter;
for (iter = info->outputs; iter; iter = iter->next) {
OutputSlotInfo *slot = iter->data;
if (slot->originating_pad == originating_pad)
return slot;
}
return NULL;
}
static GstPadProbeReturn
demux_pad_events (GstPad * pad, GstPadProbeInfo * info, OutputSlotInfo * slot)
{
GstURISourceBin *urisrc = slot->linked_info->urisrc;
GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
GstPadProbeReturn ret = GST_PAD_PROBE_OK;
GST_URI_SOURCE_BIN_LOCK (urisrc);
switch (GST_EVENT_TYPE (ev)) {
case GST_EVENT_EOS:
{
gboolean all_streams_eos;
GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad);
if (slot->pending_pad && pad != slot->pending_pad) {
GST_DEBUG_OBJECT (pad, "A pending pad is present, dropping");
ret = GST_PAD_PROBE_DROP;
break;
}
BUFFERING_LOCK (urisrc);
/* Mark that we fed an EOS to this slot */
slot->is_eos = TRUE;
all_streams_eos = all_slots_are_eos (urisrc);
BUFFERING_UNLOCK (urisrc);
if (slot->queue)
/* EOS means this element is no longer buffering */
remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
if (all_streams_eos) {
GST_DEBUG_OBJECT (urisrc, "Posting about-to-finish");
g_signal_emit (urisrc,
gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH], 0, NULL);
}
goto unlock_done;
}
case GST_EVENT_STREAM_START:
{
/* This is a temporary hack to notify downstream decodebin3 to *not*
* plug in an extra parsebin */
if (urisrc->is_adaptive || (slot->linked_info
&& slot->linked_info->demuxer_is_parsebin)) {
GST_PAD_PROBE_INFO_DATA (info) = add_stream_start_custom_flag (&ev);
}
}
/* PASSTHROUGH */
case GST_EVENT_FLUSH_STOP:
BUFFERING_LOCK (urisrc);
slot->is_eos = FALSE;
BUFFERING_UNLOCK (urisrc);
break;
default:
break;
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
unlock_done:
return ret;
}
static GstPadProbeReturn
pre_queue_event_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
GstPadProbeReturn ret = GST_PAD_PROBE_OK;
GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
switch (GST_EVENT_TYPE (ev)) {
case GST_EVENT_EOS:
{
GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad);
GST_DEBUG_OBJECT (urisrc, "POSTING ABOUT TO FINISH");
g_signal_emit (urisrc,
gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH], 0, NULL);
}
break;
default:
break;
}
return ret;
}
static GstStructure *
get_queue_statistics (GstURISourceBin * urisrc)
{
GstStructure *ret = NULL;
guint min_byte_level = 0, max_byte_level = 0;
guint64 min_time_level = 0, max_time_level = 0;
gdouble avg_byte_level = 0., avg_time_level = 0.;
guint i = 0;
GList *iter, *cur;
GST_URI_SOURCE_BIN_LOCK (urisrc);
for (iter = urisrc->src_infos; iter; iter = iter->next) {
ChildSrcPadInfo *info = iter->data;
for (cur = info->outputs; cur; cur = cur->next) {
OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
guint byte_limit = 0;
guint64 time_limit = 0;
if (!slot->queue)
continue;
g_object_get (slot->queue, "current-level-bytes", &byte_limit,
"current-level-time", &time_limit, NULL);
if (byte_limit < min_byte_level)
min_byte_level = byte_limit;
if (byte_limit > max_byte_level)
max_byte_level = byte_limit;
avg_byte_level = (avg_byte_level * i + byte_limit) / (gdouble) (i + 1);
if (time_limit < min_time_level)
min_time_level = time_limit;
if (time_limit > max_time_level)
max_time_level = time_limit;
avg_time_level = (avg_time_level * i + time_limit) / (gdouble) (i + 1);
i++;
}
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
ret = gst_structure_new ("application/x-urisourcebin-stats",
"minimum-byte-level", G_TYPE_UINT, (guint) min_byte_level,
"maximum-byte-level", G_TYPE_UINT, (guint) max_byte_level,
"average-byte-level", G_TYPE_UINT, (guint) avg_byte_level,
"minimum-time-level", G_TYPE_UINT64, (guint64) min_time_level,
"maximum-time-level", G_TYPE_UINT64, (guint64) max_time_level,
"average-time-level", G_TYPE_UINT64, (guint64) avg_time_level, NULL);
return ret;
}
static void
update_queue_values (GstURISourceBin * urisrc)
{
gint64 duration;
guint buffer_size;
gdouble low_watermark, high_watermark;
guint64 cumulative_bitrate = 0;
GList *iter, *cur;
GST_URI_SOURCE_BIN_LOCK (urisrc);
duration = GET_BUFFER_DURATION (urisrc);
buffer_size = GET_BUFFER_SIZE (urisrc);
low_watermark = urisrc->low_watermark;
high_watermark = urisrc->high_watermark;
for (iter = urisrc->src_infos; iter; iter = iter->next) {
ChildSrcPadInfo *info = iter->data;
for (cur = info->outputs; cur; cur = cur->next) {
OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
guint64 bitrate = 0;
if (!slot->queue)
continue;
if (g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
"bitrate")) {
g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
}
if (bitrate > 0)
cumulative_bitrate += bitrate;
else {
GST_TRACE_OBJECT (urisrc,
"Unknown bitrate detected from %" GST_PTR_FORMAT
", resetting all bitrates", slot->queue);
cumulative_bitrate = 0;
break;
}
}
}
GST_DEBUG_OBJECT (urisrc, "recalculating queue limits with cumulative "
"bitrate %" G_GUINT64_FORMAT ", buffer size %u, buffer duration %"
G_GINT64_FORMAT, cumulative_bitrate, buffer_size, duration);
for (iter = urisrc->src_infos; iter; iter = iter->next) {
ChildSrcPadInfo *info = iter->data;
for (cur = info->outputs; cur; cur = cur->next) {
OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
guint byte_limit;
if (!slot->queue)
continue;
if (cumulative_bitrate > 0
&& g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
"bitrate")) {
guint64 bitrate;
g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
byte_limit =
gst_util_uint64_scale (buffer_size, bitrate, cumulative_bitrate);
} else {
/* if not all queue's have valid bitrates, use the buffer-size as the
* limit */
byte_limit = buffer_size;
}
GST_DEBUG_OBJECT (urisrc,
"calculated new limits for queue-like element %" GST_PTR_FORMAT
", bytes:%u, time:%" G_GUINT64_FORMAT
", low-watermark:%f, high-watermark:%f",
slot->queue, byte_limit, (guint64) duration, low_watermark,
high_watermark);
g_object_set (G_OBJECT (slot->queue), "max-size-bytes", byte_limit,
"max-size-time", (guint64) duration, "low-watermark", low_watermark,
"high-watermark", high_watermark, NULL);
}
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
}
static void
on_queue_bitrate_changed (GstElement * queue, GParamSpec * pspec,
gpointer user_data)
{
GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
gst_element_call_async (GST_ELEMENT (urisrc),
(GstElementCallAsyncFunc) update_queue_values, NULL, NULL);
}
static void
setup_downloadbuffer (GstURISourceBin * urisrc, GstElement * downloadbuffer)
{
gchar *temp_template, *filename;
const gchar *tmp_dir, *prgname;
if (urisrc->download_dir) {
tmp_dir = urisrc->download_dir;
if (g_mkdir_with_parents (tmp_dir, 0700) != 0) {
GST_ELEMENT_ERROR (urisrc, RESOURCE, SETTINGS,
(_("Failed to create download directory '%s'."), tmp_dir), ("%s",
g_strerror (errno)));
}
} else {
tmp_dir = g_get_user_cache_dir ();
}
prgname = g_get_prgname ();
if (prgname == NULL)
prgname = "GStreamer";
filename = g_strdup_printf ("%s-XXXXXX", prgname);
/* build our filename */
temp_template = g_build_filename (tmp_dir, filename, NULL);
GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
temp_template, tmp_dir, prgname, filename);
/* configure progressive download for selected media types */
g_object_set (downloadbuffer, "temp-template", temp_template, NULL);
g_free (filename);
g_free (temp_template);
}
static void
setup_multiqueue (GstURISourceBin * urisrc, ChildSrcPadInfo * info,
GstElement * multiqueue)
{
if (info->use_downloadbuffer || !urisrc->is_stream) {
/* If we have a downloadbuffer we will let that one deal with buffering,
and we only use multiqueue for dealing with interleave */
g_object_set (info->multiqueue, "use-buffering", FALSE, NULL);
} else {
/* Else we set the minimum interleave time of multiqueue to the required
* buffering duration and ask it to report buffering */
g_object_set (info->multiqueue, "use-buffering", TRUE,
"min-interleave-time", GET_BUFFER_DURATION (urisrc), NULL);
}
/* Common properties */
g_object_set (info->multiqueue,
"sync-by-running-time", TRUE,
"use-interleave", TRUE,
"max-size-bytes", 0,
"max-size-buffers", 0,
"low-watermark", urisrc->low_watermark,
"high-watermark", urisrc->high_watermark, NULL);
gst_bin_add (GST_BIN_CAST (urisrc), info->multiqueue);
gst_element_sync_state_with_parent (info->multiqueue);
}
/* Called with lock held */
static OutputSlotInfo *
new_output_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
{
GstURISourceBin *urisrc = info->urisrc;
OutputSlotInfo *slot;
GstPad *srcpad;
GstElement *queue = NULL;
const gchar *elem_name;
gboolean use_downloadbuffer;
CopyEventData copy_data = { 0, };
GST_DEBUG_OBJECT (urisrc,
"use_queue2:%d use_downloadbuffer:%d, demuxer:%d, originating_pad:%"
GST_PTR_FORMAT, info->use_queue2, info->use_downloadbuffer,
info->demuxer != NULL, originating_pad);
slot = g_new0 (OutputSlotInfo, 1);
slot->linked_info = info;
/* If a demuxer/parsebin is present, then the downloadbuffer will have been handled before that */
use_downloadbuffer = info->use_downloadbuffer && !info->demuxer;
/* If parsebin is used, we might have to go through a multiqueue */
if (urisrc->parse_streams && (info->use_queue2 || info->use_downloadbuffer
|| !urisrc->is_stream)) {
GST_DEBUG_OBJECT (urisrc, "Using multiqueue");
if (!info->multiqueue) {
GST_DEBUG_OBJECT (urisrc,
"Creating multiqueue for handling elementary streams");
elem_name = "multiqueue";
info->multiqueue = gst_element_factory_make (elem_name, NULL);
if (!info->multiqueue)
goto no_buffer_element;
setup_multiqueue (urisrc, info, info->multiqueue);
}
slot->queue_sinkpad =
gst_element_request_pad_simple (info->multiqueue, "sink_%u");
srcpad = gst_pad_get_single_internal_link (slot->queue_sinkpad);
if (urisrc->is_adaptive || (info->demuxer_is_parsebin)) {
copy_data.rewrite_stream_start = TRUE;
}
copy_data.target_pad = slot->queue_sinkpad;
gst_pad_sticky_events_foreach (originating_pad, copy_sticky_events,
&copy_data);
copy_data.target_pad = srcpad;
gst_pad_sticky_events_foreach (originating_pad, copy_sticky_events,
&copy_data);
if (info->demuxer) {
/* Make sure we add the event probe *before* linking */
slot->demuxer_event_probe_id =
gst_pad_add_probe (originating_pad,
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
(GstPadProbeCallback) demux_pad_events, slot, NULL);
}
slot->output_pad = create_output_pad (slot, srcpad);
gst_object_unref (srcpad);
gst_pad_link (originating_pad, slot->queue_sinkpad);
GST_PAD_STREAM_UNLOCK (originating_pad);
}
/* If buffering is required, create the element. If downloadbuffer is
* required, it will take precedence over queue2 */
else if (use_downloadbuffer || info->use_queue2) {
if (use_downloadbuffer)
elem_name = "downloadbuffer";
else
elem_name = "queue2";
queue = gst_element_factory_make (elem_name, NULL);
if (!queue)
goto no_buffer_element;
slot->queue = queue;
slot->bitrate_changed_id =
g_signal_connect (G_OBJECT (queue), "notify::bitrate",
(GCallback) on_queue_bitrate_changed, urisrc);
if (use_downloadbuffer) {
setup_downloadbuffer (urisrc, slot->queue);
} else {
g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
if (info->demuxer) {
/* If a adaptive demuxer or parsebin is used, use more accurate information */
g_object_set (queue, "use-tags-bitrate", TRUE, "use-rate-estimate",
FALSE, NULL);
} else {
GST_DEBUG_OBJECT (queue,
"Setting ring-buffer-max-size %" G_GUINT64_FORMAT,
urisrc->ring_buffer_max_size);
/* Else allow ring-buffer-max-size setting to be used */
g_object_set (queue, "ring-buffer-max-size",
urisrc->ring_buffer_max_size, NULL);
}
/* Disable max-size-buffers - queue based on data rate to the default time limit */
g_object_set (queue, "max-size-buffers", 0, NULL);
/* Don't start buffering until the queue is empty (< 1%).
* Start playback when the queue is 60% full, leaving a bit more room
* for upstream to push more without getting bursty */
g_object_set (queue, "low-percent", 1, "high-percent", 60, NULL);
g_object_set (queue, "low-watermark", urisrc->low_watermark,
"high-watermark", urisrc->high_watermark, NULL);
}
/* set the necessary limits on the queue-like elements */
g_object_set (queue, "max-size-bytes", GET_BUFFER_SIZE (urisrc),
"max-size-time", (guint64) GET_BUFFER_DURATION (urisrc), NULL);
gst_bin_add (GST_BIN_CAST (urisrc), queue);
gst_element_sync_state_with_parent (queue);
slot->queue_sinkpad = gst_element_get_static_pad (queue, "sink");
/* get the new raw srcpad */
srcpad = gst_element_get_static_pad (queue, "src");
slot->output_pad = create_output_pad (slot, srcpad);
gst_object_unref (srcpad);
gst_pad_link (originating_pad, slot->queue_sinkpad);
} else {
if (info->demuxer) {
/* Make sure we add the event probe *before* linking */
slot->demuxer_event_probe_id =
gst_pad_add_probe (originating_pad,
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
(GstPadProbeCallback) demux_pad_events, slot, NULL);
}
/* Expose pad directly */
slot->output_pad = create_output_pad (slot, originating_pad);
}
slot->originating_pad = gst_object_ref (originating_pad);
/* Store stream if present */
slot->stream = gst_pad_get_stream (originating_pad);
/* save output slot so we can remove it later */
info->outputs = g_list_append (info->outputs, slot);
GST_DEBUG_OBJECT (urisrc,
"New output_pad %" GST_PTR_FORMAT " for originating pad %" GST_PTR_FORMAT,
slot->output_pad, originating_pad);
if (slot->stream)
GST_DEBUG_OBJECT (urisrc, " and stream %" GST_PTR_FORMAT, slot->stream);
return slot;
no_buffer_element:
{
g_free (slot);
post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), elem_name);
return NULL;
}
}
static GstPadProbeReturn
source_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
gpointer user_data)
{
GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
OutputSlotInfo *slot = user_data;
GstURISourceBin *urisrc = slot->linked_info->urisrc;
GST_LOG_OBJECT (pad, "%" GST_PTR_FORMAT, event);
/* A custom EOS will be received if an adaptive demuxer source pad removed a
* pad and buffering was present on that slot */
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS &&
gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (event),
CUSTOM_EOS_QUARK)) {
GstPadProbeReturn probe_ret = GST_PAD_PROBE_DROP;
GST_DEBUG_OBJECT (pad, "we received custom EOS");
/* remove custom-eos */
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (event), CUSTOM_EOS_QUARK,
NULL, NULL);
GST_URI_SOURCE_BIN_LOCK (urisrc);
if (slot->is_eos) {
/* linked_info is old input which is still linked without removal */
GST_DEBUG_OBJECT (pad, "push actual EOS");
gst_pad_push_event (slot->output_pad, event);
probe_ret = GST_PAD_PROBE_HANDLED;
}
/* And finally remove the output. This is done asynchronously since we can't
* do it from the streaming thread */
free_output_slot_async (urisrc, slot);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
return probe_ret;
}
/* never drop events */
return GST_PAD_PROBE_OK;
}
/* called when we found a raw pad to expose. We set up a
* padprobe to detect EOS before exposing the pad.
* Called with LOCK held. */
static GstPad *
create_output_pad (OutputSlotInfo * slot, GstPad * pad)
{
GstURISourceBin *urisrc = slot->linked_info->urisrc;
GstPad *newpad;
GstPadTemplate *pad_tmpl;
gchar *padname;
/* If the output slot does buffering, add a probe to detect drainage */
if (slot->queue)
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
source_pad_event_probe, slot, NULL);
pad_tmpl = gst_static_pad_template_get (&srctemplate);
padname = g_strdup_printf ("src_%u", urisrc->numpads);
urisrc->numpads++;
newpad = gst_ghost_pad_new_from_template (padname, pad, pad_tmpl);
gst_object_unref (pad_tmpl);
g_free (padname);
GST_DEBUG_OBJECT (urisrc, "Created output pad %s:%s for pad %s:%s",
GST_DEBUG_PAD_NAME (newpad), GST_DEBUG_PAD_NAME (pad));
return newpad;
}
static GstPadProbeReturn
expose_block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
GstURISourceBin *urisrc = (GstURISourceBin *) user_data;
gboolean expose = FALSE;
GST_DEBUG_OBJECT (pad, "blocking");
GST_URI_SOURCE_BIN_LOCK (urisrc);
while (!urisrc->activated && !g_atomic_int_get (&urisrc->flushing)) {
GST_DEBUG_OBJECT (urisrc, "activated:%d flushing:%d", urisrc->activated,
g_atomic_int_get (&urisrc->flushing));
g_cond_wait (&urisrc->activation_cond, &urisrc->lock);
}
GST_DEBUG_OBJECT (urisrc, "activated:%d flushing:%d", urisrc->activated,
g_atomic_int_get (&urisrc->flushing));
if (!g_atomic_int_get (&urisrc->flushing))
expose = TRUE;
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
if (expose)
gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad);
GST_DEBUG_OBJECT (pad, "Done blocking, removing probe");
return GST_PAD_PROBE_REMOVE;
}
static void
expose_output_pad (GstURISourceBin * urisrc, GstPad * pad)
{
GstPad *target;
CopyEventData copy_data = { 0, };
if (gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (urisrc)))
return; /* Pad is already exposed */
target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
gst_pad_set_active (pad, TRUE);
copy_data.target_pad = pad;
gst_pad_sticky_events_foreach (target, copy_sticky_events, &copy_data);
gst_object_unref (target);
GST_URI_SOURCE_BIN_LOCK (urisrc);
if (!urisrc->activated) {
GST_DEBUG_OBJECT (urisrc, "Not fully activated, adding pad once PAUSED !");
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
expose_block_probe, urisrc, NULL);
pad = NULL;
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
if (pad) {
GST_DEBUG_OBJECT (urisrc, "Exposing pad %" GST_PTR_FORMAT, pad);
gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad);
}
}
static void
demuxer_pad_removed_cb (GstElement * element, GstPad * pad,
ChildSrcPadInfo * info)
{
GstURISourceBin *urisrc;
OutputSlotInfo *slot;
/* we only care about srcpads */
if (!GST_PAD_IS_SRC (pad))
return;
urisrc = info->urisrc;
GST_DEBUG_OBJECT (urisrc, "pad removed name: <%s:%s>",
GST_DEBUG_PAD_NAME (pad));
GST_URI_SOURCE_BIN_LOCK (urisrc);
slot = output_slot_for_originating_pad (info, pad);
g_assert (slot);
if (slot->demuxer_event_probe_id) {
gst_pad_remove_probe (pad, slot->demuxer_event_probe_id);
slot->demuxer_event_probe_id = 0;
}
if (slot->pending_pad) {
/* Switch over to pending pad */
GST_DEBUG_OBJECT (urisrc, "Switching to pending pad <%s:%s>",
GST_DEBUG_PAD_NAME (slot->pending_pad));
slot->demuxer_event_probe_id = slot->pending_probe_id;
slot->pending_probe_id = 0;
gst_object_unref (slot->originating_pad);
slot->originating_pad = slot->pending_pad;
slot->pending_pad = NULL;
gst_object_unref (slot->stream);
slot->stream = slot->pending_stream;
slot->pending_stream = NULL;
if (slot->queue_sinkpad) {
gst_pad_link (slot->originating_pad, slot->queue_sinkpad);
} else {
gst_ghost_pad_set_target ((GstGhostPad *) slot->output_pad,
slot->originating_pad);
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
return;
}
if (slot->queue) {
gboolean was_eos;
/* Propagate custom EOS to buffering elements. The slot will be removed when
* it is received on the output of the buffering elements */
BUFFERING_LOCK (urisrc);
/* Unlink this pad from its output slot and send a fake EOS event
* to drain the queue */
was_eos = slot->is_eos;
slot->is_eos = TRUE;
BUFFERING_UNLOCK (urisrc);
remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
if (!was_eos) {
GstStructure *s;
GstEvent *event;
event = gst_event_new_eos ();
s = gst_event_writable_structure (event);
gst_structure_set (s, "urisourcebin-custom-eos", G_TYPE_BOOLEAN, TRUE,
NULL);
gst_pad_send_event (slot->queue_sinkpad, event);
}
} else {
GST_LOG_OBJECT (urisrc,
"No buffering involved, removing output slot immediately");
/* Remove output slot immediately */
info->outputs = g_list_remove (info->outputs, slot);
free_output_slot (slot, urisrc);
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
return;
}
/* helper function to lookup stuff in lists */
static gboolean
array_has_value (const gchar * values[], const gchar * value)
{
gint i;
for (i = 0; values[i]; i++) {
if (g_str_has_prefix (value, values[i]))
return TRUE;
}
return FALSE;
}
static gboolean
array_has_uri_value (const gchar * values[], const gchar * value)
{
gint i;
for (i = 0; values[i]; i++) {
if (!g_ascii_strncasecmp (value, values[i], strlen (values[i])))
return TRUE;
}
return FALSE;
}
/* list of URIs that we consider to be streams and that need buffering.
* We have no mechanism yet to figure this out with a query. */
static const gchar *stream_uris[] = { "http://", "https://", "mms://",
"mmsh://", "mmsu://", "mmst://", "fd://", "myth://", "ssh://",
"ftp://", "sftp://",
NULL
};
/* list of URIs that need a queue because they are pretty bursty */
static const gchar *queue_uris[] = { "cdda://", NULL };
/* blacklisted URIs, we know they will always fail. */
static const gchar *blacklisted_uris[] = { NULL };
/* media types that use adaptive streaming */
static const gchar *adaptive_media[] = {
"application/x-hls", "application/vnd.ms-sstr+xml",
"application/dash+xml", NULL
};
#define IS_STREAM_URI(uri) (array_has_uri_value (stream_uris, uri))
#define IS_QUEUE_URI(uri) (array_has_uri_value (queue_uris, uri))
#define IS_BLACKLISTED_URI(uri) (array_has_uri_value (blacklisted_uris, uri))
#define IS_ADAPTIVE_MEDIA(media) (array_has_value (adaptive_media, media))
/*
* Generate and configure a source element.
*/
static GstElement *
gen_source_element (GstURISourceBin * urisrc)
{
GObjectClass *source_class;
GstElement *source;
GParamSpec *pspec;
GError *err = NULL;
if (!urisrc->uri)
goto no_uri;
GST_LOG_OBJECT (urisrc, "finding source for %s", urisrc->uri);
if (!gst_uri_is_valid (urisrc->uri))
goto invalid_uri;
if (IS_BLACKLISTED_URI (urisrc->uri))
goto uri_blacklisted;
source = gst_element_make_from_uri (GST_URI_SRC, urisrc->uri, NULL, &err);
if (!source)
goto no_source;
GST_LOG_OBJECT (urisrc, "found source type %s", G_OBJECT_TYPE_NAME (source));
source_class = G_OBJECT_GET_CLASS (source);
/* Propagate connection speed */
pspec = g_object_class_find_property (source_class, "connection-speed");
if (pspec != NULL) {
guint64 speed = urisrc->connection_speed / 1000;
gboolean wrong_type = FALSE;
if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_UINT) {
GParamSpecUInt *pspecuint = G_PARAM_SPEC_UINT (pspec);
speed = CLAMP (speed, pspecuint->minimum, pspecuint->maximum);
} else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_INT) {
GParamSpecInt *pspecint = G_PARAM_SPEC_INT (pspec);
speed = CLAMP (speed, pspecint->minimum, pspecint->maximum);
} else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_UINT64) {
GParamSpecUInt64 *pspecuint = G_PARAM_SPEC_UINT64 (pspec);
speed = CLAMP (speed, pspecuint->minimum, pspecuint->maximum);
} else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_INT64) {
GParamSpecInt64 *pspecint = G_PARAM_SPEC_INT64 (pspec);
speed = CLAMP (speed, pspecint->minimum, pspecint->maximum);
} else {
GST_WARNING_OBJECT (urisrc,
"The connection speed property %" G_GUINT64_FORMAT
" of type %s is not useful. Not setting it", speed,
g_type_name (G_PARAM_SPEC_TYPE (pspec)));
wrong_type = TRUE;
}
if (!wrong_type) {
g_object_set (source, "connection-speed", speed, NULL);
GST_DEBUG_OBJECT (urisrc,
"setting connection-speed=%" G_GUINT64_FORMAT " to source element",
speed);
}
}
return source;
/* ERRORS */
no_uri:
{
GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
(_("No URI specified to play from.")), (NULL));
return NULL;
}
invalid_uri:
{
GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
(_("Invalid URI \"%s\"."), urisrc->uri), (NULL));
g_clear_error (&err);
return NULL;
}
uri_blacklisted:
{
GST_ELEMENT_ERROR (urisrc, RESOURCE, FAILED,
(_("This stream type cannot be played yet.")), (NULL));
return NULL;
}
no_source:
{
/* whoops, could not create the source element, dig a little deeper to
* figure out what might be wrong. */
if (err != NULL && err->code == GST_URI_ERROR_UNSUPPORTED_PROTOCOL) {
gchar *prot;
prot = gst_uri_get_protocol (urisrc->uri);
if (prot == NULL)
goto invalid_uri;
gst_element_post_message (GST_ELEMENT_CAST (urisrc),
gst_missing_uri_source_message_new (GST_ELEMENT (urisrc), prot));
GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN,
(_("No URI handler implemented for \"%s\"."), prot), (NULL));
g_free (prot);
} else {
GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
("%s", (err) ? err->message : "URI was not accepted by any element"),
("No element accepted URI '%s'", urisrc->uri));
}
g_clear_error (&err);
return NULL;
}
}
static gboolean
is_all_raw_caps (GstCaps * caps, GstCaps * rawcaps, gboolean * all_raw)
{
GstCaps *intersection;
gint capssize;
gboolean res = FALSE;
if (caps == NULL)
return FALSE;
capssize = gst_caps_get_size (caps);
/* no caps, skip and move to the next pad */
if (capssize == 0 || gst_caps_is_empty (caps) || gst_caps_is_any (caps))
goto done;
intersection = gst_caps_intersect (caps, rawcaps);
*all_raw = !gst_caps_is_empty (intersection)
&& (gst_caps_get_size (intersection) == capssize);
gst_caps_unref (intersection);
res = TRUE;
done:
return res;
}
static void
post_missing_plugin_error (GstElement * urisrc, const gchar * element_name)
{
GstMessage *msg;
msg = gst_missing_element_message_new (urisrc, element_name);
gst_element_post_message (urisrc, msg);
GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN,
(_("Missing element '%s' - check your GStreamer installation."),
element_name), (NULL));
}
typedef struct
{
GstURISourceBin *urisrc;
gboolean have_out;
gboolean res;
} AnalyseData;
static void
analyse_pad_foreach (const GValue * item, AnalyseData * data)
{
GstURISourceBin *urisrc = data->urisrc;
GstPad *pad = g_value_dup_object (item);
ChildSrcPadInfo *info;
GstCaps *padcaps = NULL;
gboolean pad_is_raw;
gboolean res = TRUE;
GST_LOG_OBJECT (urisrc, "pad %" GST_PTR_FORMAT, pad);
data->have_out = TRUE;
/* The info might already exist if there was an iterator resync */
if (get_cspi_for_pad (urisrc, pad)) {
GST_LOG_OBJECT (urisrc, "Already analysed");
goto out;
}
info = new_child_src_pad_info (urisrc, pad);
padcaps = gst_pad_query_caps (pad, NULL);
if (!is_all_raw_caps (padcaps, DEFAULT_CAPS, &pad_is_raw) || !pad_is_raw) {
/* if FALSE, this pad has no caps, we setup typefinding on it */
if (!setup_typefind (info)) {
res = FALSE;
goto out;
}
} else if (pad_is_raw) {
/* caps on source pad are all raw, we can add the pad */
GstPad *output_pad;
OutputSlotInfo *slot;
GST_URI_SOURCE_BIN_LOCK (urisrc);
/* Only use buffering (via queue2) on raw pads in very specific
* conditions */
info->use_queue2 = urisrc->use_buffering && IS_QUEUE_URI (urisrc->uri);
GST_DEBUG_OBJECT (urisrc, "use_buffering:%d is_queue:%d",
urisrc->use_buffering, IS_QUEUE_URI (urisrc->uri));
slot = new_output_slot (info, pad);
if (!slot) {
res = FALSE;
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
goto out;
}
/* get the new raw srcpad */
output_pad = gst_object_ref (slot->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
expose_output_pad (urisrc, output_pad);
gst_object_unref (output_pad);
} else {
GST_DEBUG_OBJECT (urisrc, "Handling non-raw pad");
/* The caps are non-raw, we handle it directly */
handle_new_pad (info, pad, padcaps);
}
out:
if (padcaps)
gst_caps_unref (padcaps);
gst_object_unref (pad);
data->res &= res;
}
/**
* analyse_source_and_expose_raw_pads:
* @urisrc: a #GstURISourceBin
* @all_pads_raw: are all pads raw data
* @have_out: does the source have output
* @is_dynamic: is this a dynamic source
*
* Check the source of @urisrc and collect information about it.
*
* All pads will be handled directly. Raw pads are exposed as-is. Pads without
* any caps will have a typefind appended to them, and other pads will be
* analysed further.
*
* @is_raw will be set to TRUE if the source only produces raw pads. When this
* function returns, all of the raw pad of the source will be added
* to @urisrc
*
* @have_out: will be set to TRUE if the source has output pads.
*
* @is_dynamic: TRUE if the element will create (more) pads dynamically later
* on.
*
* Returns: FALSE if a fatal error occurred while scanning.
*/
static gboolean
analyse_source_and_expose_raw_pads (GstURISourceBin * urisrc,
gboolean * have_out, gboolean * is_dynamic)
{
GstElementClass *elemclass;
AnalyseData data = { 0, };
GstIteratorResult iterres;
GList *walk;
GstIterator *pads_iter;
gboolean res = TRUE;
/* Collect generic information about the source */
urisrc->is_stream = IS_STREAM_URI (urisrc->uri);
if (!urisrc->is_stream) {
GstQuery *query;
GstSchedulingFlags flags;
/* do a final check to see if the source element is streamable */
query = gst_query_new_scheduling ();
if (gst_element_query (urisrc->source, query)) {
gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
if ((flags & GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED))
urisrc->is_stream = TRUE;
}
gst_query_unref (query);
}
if (urisrc->is_stream) {
GObjectClass *source_class = G_OBJECT_GET_CLASS (urisrc->source);
GParamSpec *pspec = g_object_class_find_property (source_class, "is-live");
/* Live sources are not streamable */
if (pspec && G_PARAM_SPEC_VALUE_TYPE (pspec) == G_TYPE_BOOLEAN) {
gboolean is_live;
g_object_get (G_OBJECT (urisrc->source), "is-live", &is_live, NULL);
if (is_live)
urisrc->is_stream = FALSE;
}
}
GST_LOG_OBJECT (urisrc, "source is stream: %d", urisrc->is_stream);
/* Handle the existing source pads */
pads_iter = gst_element_iterate_src_pads (urisrc->source);
restart:
data.res = TRUE;
data.have_out = FALSE;
data.urisrc = urisrc;
iterres =
gst_iterator_foreach (pads_iter,
(GstIteratorForeachFunction) analyse_pad_foreach, &data);
if (iterres == GST_ITERATOR_RESYNC)
goto restart;
if (iterres == GST_ITERATOR_ERROR)
res = FALSE;
else
res = data.res;
gst_iterator_free (pads_iter);
/* check for padtemplates that list SOMETIMES pads to
* determine if the element is dynamic. */
*is_dynamic = FALSE;
elemclass = GST_ELEMENT_GET_CLASS (urisrc->source);
walk = gst_element_class_get_pad_template_list (elemclass);
while (walk != NULL) {
GstPadTemplate *templ;
templ = (GstPadTemplate *) walk->data;
if (GST_PAD_TEMPLATE_DIRECTION (templ) == GST_PAD_SRC) {
if (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_SOMETIMES)
*is_dynamic = TRUE;
break;
}
walk = g_list_next (walk);
}
*have_out = data.have_out;
return res;
}
/* make a demuxer and connect to all the signals */
static GstElement *
make_demuxer (GstURISourceBin * urisrc, ChildSrcPadInfo * info, GstCaps * caps)
{
GList *factories, *eligible, *cur;
GstElement *demuxer = NULL;
GParamSpec *pspec;
GST_LOG_OBJECT (urisrc, "making new adaptive demuxer");
/* now create the demuxer element */
/* FIXME: Fire a signal to get the demuxer? */
factories = gst_element_factory_list_get_elements
(GST_ELEMENT_FACTORY_TYPE_DEMUXER, GST_RANK_MARGINAL);
eligible =
gst_element_factory_list_filter (factories, caps, GST_PAD_SINK,
gst_caps_is_fixed (caps));
gst_plugin_feature_list_free (factories);
if (eligible == NULL)
goto no_demuxer;
eligible = g_list_sort (eligible, gst_plugin_feature_rank_compare_func);
for (cur = eligible; cur != NULL; cur = g_list_next (cur)) {
GstElementFactory *factory = (GstElementFactory *) (cur->data);
const gchar *klass =
gst_element_factory_get_metadata (factory, GST_ELEMENT_METADATA_KLASS);
/* Can't be a demuxer unless it has Demux in the klass name */
if (!strstr (klass, "Demux") || !strstr (klass, "Adaptive"))
continue;
demuxer = gst_element_factory_create (factory, NULL);
break;
}
gst_plugin_feature_list_free (eligible);
if (!demuxer)
goto no_demuxer;
GST_DEBUG_OBJECT (urisrc, "Created adaptive demuxer %" GST_PTR_FORMAT,
demuxer);
/* set up callbacks to create the links between
* demuxer streams and output */
g_signal_connect (demuxer,
"pad-added", G_CALLBACK (new_demuxer_pad_added_cb), info);
g_signal_connect (demuxer,
"pad-removed", G_CALLBACK (demuxer_pad_removed_cb), info);
/* Propagate connection-speed property */
pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (demuxer),
"connection-speed");
if (pspec != NULL)
g_object_set (demuxer,
"connection-speed", urisrc->connection_speed / 1000, NULL);
return demuxer;
/* ERRORS */
no_demuxer:
{
/* FIXME: Fire the right error */
GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN, (NULL),
("No demuxer element, check your installation"));
return NULL;
}
}
static gboolean
setup_parsebin_for_slot (ChildSrcPadInfo * info, GstPad * originating_pad)
{
GstURISourceBin *urisrc = info->urisrc;
GstPad *sinkpad;
GstPadLinkReturn link_res;
GST_DEBUG_OBJECT (urisrc, "Setting up parsebin for %" GST_PTR_FORMAT,
originating_pad);
if (g_atomic_int_get (&urisrc->flushing)) {
GST_DEBUG_OBJECT (urisrc, "Shutting down, returning early");
return FALSE;
}
GST_URI_SOURCE_BIN_LOCK (urisrc);
/* Set up optional pre-parsebin download/ringbuffer elements */
if (info->use_downloadbuffer || urisrc->ring_buffer_max_size) {
if (info->use_downloadbuffer) {
GST_DEBUG_OBJECT (urisrc, "Setting up pre-parsebin downloadbuffer");
info->pre_parse_queue = gst_element_factory_make ("downloadbuffer", NULL);
setup_downloadbuffer (urisrc, info->pre_parse_queue);
g_object_set (info->pre_parse_queue, "max-size-bytes",
GET_BUFFER_SIZE (urisrc), "max-size-time",
(guint64) GET_BUFFER_DURATION (urisrc), NULL);
} else if (urisrc->ring_buffer_max_size) {
/* If a ring-buffer-max-size is specified with parsebin, we set it up on
* the queue2 *before* parsebin. We will use its buffering levels instead
* of the ones from multiqueue */
GST_DEBUG_OBJECT (urisrc,
"Setting up pre-parsebin queue2 for ring-buffer-max-size %"
G_GUINT64_FORMAT, urisrc->ring_buffer_max_size);
info->pre_parse_queue = gst_element_factory_make ("queue2", NULL);
/* We do not use this queue2 for buffering levels, but the multiqueue */
g_object_set (info->pre_parse_queue, "use-buffering", FALSE,
"ring-buffer-max-size", urisrc->ring_buffer_max_size,
"max-size-buffers", 0, NULL);
}
gst_bin_add (GST_BIN_CAST (urisrc), info->pre_parse_queue);
sinkpad = gst_element_get_static_pad (info->pre_parse_queue, "sink");
link_res = gst_pad_link (originating_pad, sinkpad);
gst_object_unref (sinkpad);
if (link_res != GST_PAD_LINK_OK)
goto could_not_link;
}
info->demuxer = gst_element_factory_make ("parsebin", NULL);
if (!info->demuxer) {
post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), "parsebin");
return FALSE;
}
gst_bin_add (GST_BIN_CAST (urisrc), info->demuxer);
info->demuxer_is_parsebin = TRUE;
if (info->pre_parse_queue) {
if (!gst_element_link_pads (info->pre_parse_queue, "src", info->demuxer,
"sink"))
goto could_not_link;
} else {
sinkpad = gst_element_get_static_pad (info->demuxer, "sink");
link_res = gst_pad_link (originating_pad, sinkpad);
gst_object_unref (sinkpad);
if (link_res != GST_PAD_LINK_OK)
goto could_not_link;
}
/* set up callbacks to create the links between parsebin and output */
g_signal_connect (info->demuxer,
"pad-added", G_CALLBACK (new_demuxer_pad_added_cb), info);
g_signal_connect (info->demuxer,
"pad-removed", G_CALLBACK (demuxer_pad_removed_cb), info);
if (info->pre_parse_queue) {
gst_element_sync_state_with_parent (info->pre_parse_queue);
}
gst_element_sync_state_with_parent (info->demuxer);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
return TRUE;
could_not_link:
{
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
(NULL), ("Can't link to (pre-)parsebin element"));
return FALSE;
}
}
/* Called when:
* * Source element adds a new pad
* * typefind has found a type
*/
static void
handle_new_pad (ChildSrcPadInfo * info, GstPad * srcpad, GstCaps * caps)
{
GstURISourceBin *urisrc = info->urisrc;
gboolean is_raw;
GstStructure *s;
const gchar *media_type;
GST_URI_SOURCE_BIN_LOCK (urisrc);
/* if this is a pad with all raw caps, we can expose it */
if (is_all_raw_caps (caps, DEFAULT_CAPS, &is_raw) && is_raw) {
OutputSlotInfo *slot;
GstPad *output_pad;
GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT
", exposing", caps);
slot = new_output_slot (info, srcpad);
output_pad = gst_object_ref (slot->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
expose_output_pad (urisrc, slot->output_pad);
gst_object_unref (output_pad);
return;
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
s = gst_caps_get_structure (caps, 0);
media_type = gst_structure_get_name (s);
urisrc->is_adaptive = IS_ADAPTIVE_MEDIA (media_type);
if (urisrc->is_adaptive) {
GstPad *sinkpad;
GstPadLinkReturn link_res;
GstQuery *query;
info->demuxer = make_demuxer (urisrc, info, caps);
if (!info->demuxer)
goto no_demuxer;
gst_bin_add (GST_BIN_CAST (urisrc), info->demuxer);
/* Query the demuxer to see if it can handle buffering */
query = gst_query_new_buffering (GST_FORMAT_TIME);
info->use_queue2 = urisrc->use_buffering
&& !gst_element_query (info->demuxer, query);
gst_query_unref (query);
GST_DEBUG_OBJECT (urisrc, "Demuxer handles buffering : %d",
info->demuxer_handles_buffering);
sinkpad = gst_element_get_static_pad (info->demuxer, "sink");
if (sinkpad == NULL)
goto no_demuxer_sink;
link_res = gst_pad_link (srcpad, sinkpad);
gst_object_unref (sinkpad);
if (link_res != GST_PAD_LINK_OK)
goto could_not_link;
gst_element_sync_state_with_parent (info->demuxer);
} else if (!urisrc->is_stream) {
if (urisrc->parse_streams) {
/* GST_URI_SOURCE_BIN_LOCK (urisrc); */
setup_parsebin_for_slot (info, srcpad);
/* GST_URI_SOURCE_BIN_UNLOCK (urisrc); */
} else {
/* We don't need buffering here, expose immediately */
OutputSlotInfo *slot;
GstPad *output_pad;
GST_URI_SOURCE_BIN_LOCK (urisrc);
slot = new_output_slot (info, srcpad);
output_pad = gst_object_ref (slot->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
expose_output_pad (urisrc, output_pad);
gst_object_unref (output_pad);
}
} else {
/* only enable download buffering if the upstream duration is known */
if (urisrc->download) {
GstQuery *query = gst_query_new_duration (GST_FORMAT_BYTES);
if (gst_pad_query (srcpad, query)) {
gint64 dur;
gst_query_parse_duration (query, NULL, &dur);
info->use_downloadbuffer = (dur != -1);
}
gst_query_unref (query);
}
info->use_queue2 = urisrc->use_buffering;
if (urisrc->parse_streams) {
/* GST_URI_SOURCE_BIN_LOCK (urisrc); */
setup_parsebin_for_slot (info, srcpad);
/* GST_URI_SOURCE_BIN_UNLOCK (urisrc); */
} else {
OutputSlotInfo *slot;
GstPad *output_pad;
GST_URI_SOURCE_BIN_LOCK (urisrc);
slot = new_output_slot (info, srcpad);
gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
pre_queue_event_probe, urisrc, NULL);
output_pad = gst_object_ref (slot->output_pad);
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
expose_output_pad (urisrc, output_pad);
gst_object_unref (output_pad);
}
}
return;
/* ERRORS */
no_demuxer:
{
/* error was posted */
return;
}
no_demuxer_sink:
{
GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
(NULL), ("Adaptive demuxer element has no 'sink' pad"));
return;
}
could_not_link:
{
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
(NULL), ("Can't link typefind to adaptive demuxer element"));
return;
}
}
/* signaled when we have a stream and we need to configure the download
* buffering or regular buffering */
static void
type_found (GstElement * typefind, guint probability,
GstCaps * caps, ChildSrcPadInfo * info)
{
GstURISourceBin *urisrc = info->urisrc;
GstPad *srcpad = gst_element_get_static_pad (typefind, "src");
GST_DEBUG_OBJECT (urisrc, "typefind found caps %" GST_PTR_FORMAT
" on pad %" GST_PTR_FORMAT, caps, srcpad);
handle_new_pad (info, srcpad, caps);
gst_object_unref (GST_OBJECT (srcpad));
}
/* setup typefind for any source. This will first plug a typefind element to the
* source. After we find the type, we decide to whether to plug an adaptive
* demuxer, or just link through queue2 (if needed) and expose the data */
static gboolean
setup_typefind (ChildSrcPadInfo * info)
{
GstURISourceBin *urisrc = info->urisrc;
GstPad *sinkpad;
/* now create the typefind element */
info->typefind = gst_element_factory_make ("typefind", NULL);
if (!info->typefind)
goto no_typefind;
/* Make sure the bin doesn't set the typefind running yet */
gst_element_set_locked_state (info->typefind, TRUE);
gst_bin_add (GST_BIN_CAST (urisrc), info->typefind);
sinkpad = gst_element_get_static_pad (info->typefind, "sink");
if (gst_pad_link (info->src_pad, sinkpad) != GST_PAD_LINK_OK)
goto could_not_link;
gst_object_unref (sinkpad);
/* connect a signal to find out when the typefind element found
* a type */
g_signal_connect (info->typefind, "have-type", G_CALLBACK (type_found), info);
/* Now it can start */
gst_element_set_locked_state (info->typefind, FALSE);
gst_element_sync_state_with_parent (info->typefind);
return TRUE;
/* ERRORS */
no_typefind:
{
post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), "typefind");
GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN, (NULL),
("No typefind element, check your installation"));
return FALSE;
}
could_not_link:
{
gst_object_unref (sinkpad);
gst_element_set_locked_state (info->typefind, FALSE);
GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
(NULL), ("Can't link source to typefind element"));
return FALSE;
}
}
/* CALL WITH URISOURCEBIN LOCK */
static void
free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc)
{
GST_DEBUG_OBJECT (urisrc,
"removing output slot %" GST_PTR_FORMAT " -> %" GST_PTR_FORMAT,
slot->originating_pad, slot->output_pad);
if (slot->queue) {
if (slot->bitrate_changed_id > 0)
g_signal_handler_disconnect (slot->queue, slot->bitrate_changed_id);
slot->bitrate_changed_id = 0;
gst_element_set_locked_state (slot->queue, TRUE);
gst_element_set_state (slot->queue, GST_STATE_NULL);
remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue);
}
if (slot->queue_sinkpad) {
if (slot->linked_info && slot->linked_info->multiqueue)
gst_element_release_request_pad (slot->linked_info->multiqueue,
slot->queue_sinkpad);
gst_object_replace ((GstObject **) & slot->queue_sinkpad, NULL);
}
if (slot->demuxer_event_probe_id)
gst_pad_remove_probe (slot->originating_pad, slot->demuxer_event_probe_id);
if (slot->pending_pad) {
if (slot->pending_probe_id)
gst_pad_remove_probe (slot->pending_pad, slot->pending_probe_id);
gst_object_unref (slot->pending_pad);
}
if (slot->stream)
gst_object_unref (slot->stream);
if (slot->pending_stream)
gst_object_unref (slot->pending_stream);
gst_object_unref (slot->originating_pad);
/* deactivate and remove the srcpad */
gst_pad_set_active (slot->output_pad, FALSE);
gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), slot->output_pad);
g_free (slot);
}
static void
call_free_output_slot (GstURISourceBin * urisrc, OutputSlotInfo * slot)
{
GST_LOG_OBJECT (urisrc, "free output slot in thread pool");
free_output_slot (slot, urisrc);
}
/* must be called with GST_URI_SOURCE_BIN_LOCK */
static void
free_output_slot_async (GstURISourceBin * urisrc, OutputSlotInfo * slot)
{
GST_LOG_OBJECT (urisrc, "pushing output slot on thread pool to free");
slot->linked_info->outputs = g_list_remove (slot->linked_info->outputs, slot);
gst_element_call_async (GST_ELEMENT_CAST (urisrc),
(GstElementCallAsyncFunc) call_free_output_slot, slot, NULL);
}
/* remove source and all related elements */
static void
remove_source (GstURISourceBin * urisrc)
{
if (urisrc->source) {
GstElement *source = urisrc->source;
GST_DEBUG_OBJECT (urisrc, "removing old src element");
gst_element_set_state (source, GST_STATE_NULL);
if (urisrc->src_np_sig_id) {
g_signal_handler_disconnect (source, urisrc->src_np_sig_id);
urisrc->src_np_sig_id = 0;
}
gst_bin_remove (GST_BIN_CAST (urisrc), source);
urisrc->source = NULL;
}
GST_URI_SOURCE_BIN_LOCK (urisrc);
if (urisrc->src_infos) {
g_list_foreach (urisrc->src_infos, (GFunc) free_child_src_pad_info, urisrc);
g_list_free (urisrc->src_infos);
urisrc->src_infos = NULL;
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
}
/* is called when a dynamic source element created a new pad. */
static void
source_new_pad (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
{
GstCaps *caps;
ChildSrcPadInfo *info = new_child_src_pad_info (urisrc, pad);
GST_DEBUG_OBJECT (urisrc, "Found new pad %s.%s in source element %s",
GST_DEBUG_PAD_NAME (pad), GST_ELEMENT_NAME (element));
caps = gst_pad_get_current_caps (pad);
GST_DEBUG_OBJECT (urisrc, "caps %" GST_PTR_FORMAT, caps);
if (caps == NULL)
setup_typefind (info);
else {
handle_new_pad (info, pad, caps);
gst_caps_unref (caps);
}
}
/* construct and run the source and demuxer elements until we found
* all the streams or until a preroll queue has been filled.
*/
static gboolean
setup_source (GstURISourceBin * urisrc)
{
gboolean have_out, is_dynamic;
GST_DEBUG_OBJECT (urisrc, "setup source");
/* create and configure an element that can handle the uri */
if (!(urisrc->source = gen_source_element (urisrc)))
goto no_source;
/* state will be merged later - if file is not found, error will be
* handled by the application right after. */
gst_bin_add (GST_BIN_CAST (urisrc), urisrc->source);
/* notify of the new source used and allow external users to do final
* modifications before activating the element */
g_object_notify (G_OBJECT (urisrc), "source");
g_signal_emit (urisrc, gst_uri_source_bin_signals[SIGNAL_SOURCE_SETUP],
0, urisrc->source);
if (gst_element_set_state (urisrc->source,
GST_STATE_READY) != GST_STATE_CHANGE_SUCCESS)
goto state_fail;
/* see if the source element emits raw audio/video all by itself,
* if so, we can create streams for the pads and be done with it.
* Also check that is has source pads, if not, we assume it will
* do everything itself. */
if (!analyse_source_and_expose_raw_pads (urisrc, &have_out, &is_dynamic))
goto invalid_source;
if (!is_dynamic) {
if (!have_out)
goto no_pads;
} else {
GST_DEBUG_OBJECT (urisrc, "Source has dynamic output pads");
/* connect a handler for the new-pad signal */
urisrc->src_np_sig_id =
g_signal_connect (urisrc->source, "pad-added",
G_CALLBACK (source_new_pad), urisrc);
}
return TRUE;
/* ERRORS */
no_source:
{
/* error message was already posted */
return FALSE;
}
invalid_source:
{
GST_ELEMENT_ERROR (urisrc, CORE, FAILED,
(_("Source element is invalid.")), (NULL));
return FALSE;
}
state_fail:
{
GST_ELEMENT_ERROR (urisrc, CORE, FAILED,
(_("Source element can't be prepared")), (NULL));
return FALSE;
}
no_pads:
{
GST_ELEMENT_ERROR (urisrc, CORE, FAILED,
(_("Source element has no pads.")), (NULL));
return FALSE;
}
}
static void
value_list_append_structure_list (GValue * list_val, GstStructure ** first,
GList * structure_list)
{
GList *l;
for (l = structure_list; l != NULL; l = l->next) {
GValue val = { 0, };
if (*first == NULL)
*first = gst_structure_copy ((GstStructure *) l->data);
g_value_init (&val, GST_TYPE_STRUCTURE);
g_value_take_boxed (&val, gst_structure_copy ((GstStructure *) l->data));
gst_value_list_append_value (list_val, &val);
g_value_unset (&val);
}
}
/* if it's a redirect message with multiple redirect locations we might
* want to pick a different 'best' location depending on the required
* bitrates and the connection speed */
static GstMessage *
handle_redirect_message (GstURISourceBin * urisrc, GstMessage * msg)
{
const GValue *locations_list, *location_val;
GstMessage *new_msg;
GstStructure *new_structure = NULL;
GList *l_good = NULL, *l_neutral = NULL, *l_bad = NULL;
GValue new_list = { 0, };
guint size, i;
const GstStructure *structure;
GST_DEBUG_OBJECT (urisrc, "redirect message: %" GST_PTR_FORMAT, msg);
GST_DEBUG_OBJECT (urisrc, "connection speed: %" G_GUINT64_FORMAT,
urisrc->connection_speed);
structure = gst_message_get_structure (msg);
if (urisrc->connection_speed == 0 || structure == NULL)
return msg;
locations_list = gst_structure_get_value (structure, "locations");
if (locations_list == NULL)
return msg;
size = gst_value_list_get_size (locations_list);
if (size < 2)
return msg;
/* maintain existing order as much as possible, just sort references
* with too high a bitrate to the end (the assumption being that if
* bitrates are given they are given for all interesting streams and
* that the you-need-at-least-version-xyz redirect has the same bitrate
* as the lowest referenced redirect alternative) */
for (i = 0; i < size; ++i) {
const GstStructure *s;
gint bitrate = 0;
location_val = gst_value_list_get_value (locations_list, i);
s = (const GstStructure *) g_value_get_boxed (location_val);
if (!gst_structure_get_int (s, "minimum-bitrate", &bitrate) || bitrate <= 0) {
GST_DEBUG_OBJECT (urisrc, "no bitrate: %" GST_PTR_FORMAT, s);
l_neutral = g_list_append (l_neutral, (gpointer) s);
} else if (bitrate > urisrc->connection_speed) {
GST_DEBUG_OBJECT (urisrc, "bitrate too high: %" GST_PTR_FORMAT, s);
l_bad = g_list_append (l_bad, (gpointer) s);
} else if (bitrate <= urisrc->connection_speed) {
GST_DEBUG_OBJECT (urisrc, "bitrate OK: %" GST_PTR_FORMAT, s);
l_good = g_list_append (l_good, (gpointer) s);
}
}
g_value_init (&new_list, GST_TYPE_LIST);
value_list_append_structure_list (&new_list, &new_structure, l_good);
value_list_append_structure_list (&new_list, &new_structure, l_neutral);
value_list_append_structure_list (&new_list, &new_structure, l_bad);
gst_structure_take_value (new_structure, "locations", &new_list);
g_list_free (l_good);
g_list_free (l_neutral);
g_list_free (l_bad);
new_msg = gst_message_new_element (msg->src, new_structure);
gst_message_unref (msg);
GST_DEBUG_OBJECT (urisrc, "new redirect message: %" GST_PTR_FORMAT, new_msg);
return new_msg;
}
/* CALL WITH URISOURCEBIN LOCK */
static OutputSlotInfo *
output_slot_for_buffering_element (GstURISourceBin * urisrc,
GstElement * element)
{
GList *top, *iter;
for (top = urisrc->src_infos; top; top = top->next) {
ChildSrcPadInfo *info = top->data;
for (iter = info->outputs; iter; iter = iter->next) {
OutputSlotInfo *slot = iter->data;
if (slot->queue == element)
return slot;
}
}
return NULL;
}
static void
handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
{
gint perc, msg_perc;
gint smaller_perc = 100;
GstMessage *smaller = NULL;
GList *found = NULL;
GList *iter;
OutputSlotInfo *slot;
/* buffering messages must be aggregated as there might be multiple buffering
* elements in the pipeline and their independent buffering messages will
* confuse the application
*
* urisourcebin keeps a list of messages received from elements that are
* buffering.
* Rules are:
* 0) Ignore buffering from elements that are draining (is_eos == TRUE)
* 1) Always post the smaller buffering %
* 2) If an element posts a 100% buffering message, remove it from the list
* 3) When there are no more messages on the list, post 100% message
* 4) When an element posts a new buffering message, update the one
* on the list to this new value
*/
gst_message_parse_buffering (msg, &msg_perc);
GST_LOG_OBJECT (urisrc, "Got buffering msg from %" GST_PTR_FORMAT
" with %d%%", GST_MESSAGE_SRC (msg), msg_perc);
BUFFERING_LOCK (urisrc);
slot =
output_slot_for_buffering_element (urisrc,
(GstElement *) GST_MESSAGE_SRC (msg));
if (slot && slot->is_eos) {
/* Ignore buffering messages from queues we marked as EOS,
* we already removed those from the list of buffering
* objects */
BUFFERING_UNLOCK (urisrc);
gst_message_replace (&msg, NULL);
return;
}
g_mutex_lock (&urisrc->buffering_post_lock);
/*
* Single loop for 2 things:
* 1) Look for a message with the same source
* 1.1) If the received message is 100%, remove it from the list
* 2) Find the minimum buffering from the list from elements that aren't EOS
*/
for (iter = urisrc->buffering_status; iter;) {
GstMessage *bufstats = iter->data;
gboolean is_eos = FALSE;
slot =
output_slot_for_buffering_element (urisrc,
(GstElement *) GST_MESSAGE_SRC (msg));
if (slot)
is_eos = slot->is_eos;
if (GST_MESSAGE_SRC (bufstats) == GST_MESSAGE_SRC (msg)) {
found = iter;
if (msg_perc < 100) {
gst_message_unref (iter->data);
bufstats = iter->data = gst_message_ref (msg);
} else {
GList *current = iter;
/* remove the element here and avoid confusing the loop */
iter = g_list_next (iter);
gst_message_unref (current->data);
urisrc->buffering_status =
g_list_delete_link (urisrc->buffering_status, current);
continue;
}
}
/* only update minimum stat for non-EOS slots */
if (!is_eos) {
gst_message_parse_buffering (bufstats, &perc);
if (perc < smaller_perc) {
smaller_perc = perc;
smaller = bufstats;
}
} else {
GST_LOG_OBJECT (urisrc, "Ignoring buffering from EOS element");
}
iter = g_list_next (iter);
}
if (found == NULL && msg_perc < 100) {
if (msg_perc < smaller_perc) {
smaller_perc = msg_perc;
smaller = msg;
}
urisrc->buffering_status =
g_list_prepend (urisrc->buffering_status, gst_message_ref (msg));
}
if (smaller_perc == urisrc->last_buffering_pct) {
/* Don't repeat our last buffering status */
gst_message_replace (&msg, NULL);
} else {
urisrc->last_buffering_pct = smaller_perc;
/* now compute the buffering message that should be posted */
if (smaller_perc == 100) {
g_assert (urisrc->buffering_status == NULL);
/* we are posting the original received msg */
} else {
gst_message_replace (&msg, smaller);
}
}
BUFFERING_UNLOCK (urisrc);
if (msg) {
GST_LOG_OBJECT (urisrc, "Sending buffering msg from %" GST_PTR_FORMAT
" with %d%%", GST_MESSAGE_SRC (msg), smaller_perc);
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN (urisrc), msg);
} else {
GST_LOG_OBJECT (urisrc, "Dropped buffering msg as a repeat of %d%%",
smaller_perc);
}
g_mutex_unlock (&urisrc->buffering_post_lock);
}
/* Remove any buffering message from the given source */
static void
remove_buffering_msgs (GstURISourceBin * urisrc, GstObject * src)
{
GList *iter;
gboolean removed = FALSE, post;
BUFFERING_LOCK (urisrc);
g_mutex_lock (&urisrc->buffering_post_lock);
GST_DEBUG_OBJECT (urisrc, "Removing %" GST_PTR_FORMAT
" buffering messages", src);
for (iter = urisrc->buffering_status; iter;) {
GstMessage *bufstats = iter->data;
if (GST_MESSAGE_SRC (bufstats) == src) {
gst_message_unref (bufstats);
urisrc->buffering_status =
g_list_delete_link (urisrc->buffering_status, iter);
removed = TRUE;
break;
}
iter = g_list_next (iter);
}
post = (removed && urisrc->buffering_status == NULL);
BUFFERING_UNLOCK (urisrc);
if (post) {
GST_DEBUG_OBJECT (urisrc, "Last buffering element done - posting 100%%");
/* removed the last buffering element, post 100% */
gst_element_post_message (GST_ELEMENT_CAST (urisrc),
gst_message_new_buffering (GST_OBJECT_CAST (urisrc), 100));
}
g_mutex_unlock (&urisrc->buffering_post_lock);
}
static ChildSrcPadInfo *
find_adaptive_demuxer_cspi_for_msg (GstURISourceBin * urisrc,
GstElement * child)
{
ChildSrcPadInfo *res = NULL;
GList *tmp;
GstElement *parent = gst_object_ref (child);
do {
GstElement *next_parent;
for (tmp = urisrc->src_infos; tmp; tmp = tmp->next) {
ChildSrcPadInfo *info = tmp->data;
if (parent == info->demuxer) {
res = info;
break;
}
}
next_parent = (GstElement *) gst_element_get_parent (parent);
gst_object_unref (parent);
parent = next_parent;
} while (parent && parent != (GstElement *) urisrc);
if (parent)
gst_object_unref (parent);
return res;
}
static GstStream *
find_compatible_stream (GList * streams, GstStream * stream)
{
GList *iter;
GstStreamType stream_type = gst_stream_get_stream_type (stream);
for (iter = streams; iter; iter = iter->next) {
GstStream *candidate = iter->data;
if (gst_stream_get_stream_type (candidate) == stream_type)
return candidate;
}
return NULL;
}
static void
handle_parsebin_collection (ChildSrcPadInfo * info,
GstStreamCollection * collection)
{
GList *unused_slots = NULL, *iter;
GList *streams = NULL;
GList *unused_streams = NULL;
guint i, nb_streams;
nb_streams = gst_stream_collection_get_size (collection);
for (i = 0; i < nb_streams; i++)
streams =
g_list_append (streams, gst_stream_collection_get_stream (collection,
i));
unused_streams = g_list_copy (streams);
/* Get list of output info slots not present in the collection */
for (iter = info->outputs; iter; iter = iter->next) {
OutputSlotInfo *output = iter->data;
if (!output->stream)
continue;
if (!gst_playback_utils_stream_in_list (streams, output->stream)) {
GST_DEBUG_OBJECT (output->originating_pad,
"No longer used in new collection");
unused_slots = g_list_append (unused_slots, output);
} else {
GList *iter2 = unused_streams;
/* Stream is re-used, remove it from unused streams we will try to
* re-assign further down */
for (iter2 = unused_streams; iter2; iter2 = iter2->next) {
GstStream *stream = iter2->data;
if (!g_strcmp0 (output->stream->stream_id, stream->stream_id)) {
/* Replace the pending stream by the incoming stream */
gst_object_replace ((GstObject **) & output->pending_stream,
(GstObject *) stream);
unused_streams = g_list_remove (unused_streams, stream);
break;
}
}
}
}
/* For each of those slots, check if there is a unused compatible stream from
* the collection that could be assigned to it */
for (iter = unused_slots; iter; iter = iter->next) {
OutputSlotInfo *output = iter->data;
GstStream *replacement =
find_compatible_stream (unused_streams, output->stream);
if (replacement) {
GST_DEBUG_OBJECT (output->originating_pad, "Assigning stream %s",
gst_stream_get_stream_id (replacement));
gst_object_replace ((GstObject **) & output->pending_stream,
(GstObject *) replacement);
unused_streams = g_list_remove (unused_streams, replacement);
}
}
g_list_free (unused_slots);
g_list_free (streams);
g_list_free (unused_streams);
/* Store the collection */
gst_object_replace ((GstObject **) & info->collection,
(GstObject *) collection);
}
/* uri_source_bin_aggregate_collection:
*
* Go over the collections provided by the various parsebin (via
* ChildSrcPadInfo) and provide a unified collection.
*
* If there are more than one collection and they are different, a new
* aggregated collection will be returned.
*
* In all cases, the collection will be stored as the uri source bin main
* collection.
*
* Return: (transfer full): The collection, or %NULL.
*/
static GstStreamCollection *
uri_source_bin_aggregate_collection (GstURISourceBin * urisrc)
{
GList *iter;
GList *streams = NULL;
gboolean collections_need_merging = FALSE;
GstStreamCollection *res = NULL;
for (iter = urisrc->src_infos; iter; iter = iter->next) {
ChildSrcPadInfo *info = iter->data;
GST_DEBUG_OBJECT (info->demuxer, "collection %" GST_PTR_FORMAT,
info->collection);
if (res && info->collection && info->collection != res) {
collections_need_merging = TRUE;
break;
}
if (info->collection)
res = info->collection;
}
if (!collections_need_merging) {
GST_DEBUG_OBJECT (urisrc, "No need to aggregate");
goto store_exit;
}
res = gst_stream_collection_new ("urisourcebin");
for (iter = urisrc->src_infos; iter; iter = iter->next) {
ChildSrcPadInfo *info = iter->data;
if (info->collection) {
guint i, len;
len = gst_stream_collection_get_size (info->collection);
for (i = 0; i < len; i++) {
GstStream *stream =
gst_stream_collection_get_stream (info->collection, i);
if (!gst_playback_utils_stream_in_list (streams, stream)) {
streams = g_list_append (streams, stream);
}
}
}
}
for (iter = streams; iter; iter = iter->next) {
GstStream *stream = iter->data;
gst_stream_collection_add_stream (res, gst_object_ref (stream));
}
if (streams)
g_list_free (streams);
store_exit:
gst_object_replace ((GstObject **) & urisrc->collection, (GstObject *) res);
GST_DEBUG_OBJECT (urisrc, "Aggregated collection %" GST_PTR_FORMAT, res);
return res ? gst_object_ref (res) : NULL;
}
static void
handle_message (GstBin * bin, GstMessage * msg)
{
GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (bin);
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_ELEMENT:{
if (gst_message_has_name (msg, "redirect")) {
/* sort redirect messages based on the connection speed. This simplifies
* the user of this element as it can in most cases just pick the first item
* of the sorted list as a good redirection candidate. It can of course
* choose something else from the list if it has a better way. */
msg = handle_redirect_message (urisrc, msg);
}
break;
}
case GST_MESSAGE_STREAM_COLLECTION:
{
ChildSrcPadInfo *info;
/* We only want to forward stream collections:
* * from the source element
* * OR from adaptive demuxers
* * OR from parsebin if there is only a single ChildSrcPadInfo
*
* We do not want to forward them if there are multiple parsebins since it
* requires them to be aggregated to be coherent to the outside world.
*/
GST_URI_SOURCE_BIN_LOCK (urisrc);
info =
find_adaptive_demuxer_cspi_for_msg (urisrc,
(GstElement *) GST_MESSAGE_SRC (msg));
if (info) {
info->demuxer_streams_aware = TRUE;
if (info->demuxer_is_parsebin) {
GstStreamCollection *collection = NULL;
gst_message_parse_stream_collection (msg, &collection);
GST_DEBUG_OBJECT (bin, "Seen collection %" GST_PTR_FORMAT,
collection);
/* Check if some output slots can/could be re-used with this new collection */
if (collection) {
GstStreamCollection *aggregated = NULL;
handle_parsebin_collection (info, collection);
aggregated = uri_source_bin_aggregate_collection (urisrc);
if (aggregated != collection) {
gst_message_unref (msg);
msg =
gst_message_new_stream_collection ((GstObject *) urisrc,
aggregated);
}
if (aggregated) {
/* Remove ref obtained from aggregate_collection() */
gst_object_unref (aggregated);
}
gst_object_unref (collection);
}
}
} else if (GST_MESSAGE_SRC (msg) != (GstObject *) urisrc->source) {
GST_LOG_OBJECT (bin, "Collection %" GST_PTR_FORMAT, msg);
GST_DEBUG_OBJECT (bin,
"Dropping stream-collection from %"
GST_PTR_FORMAT, GST_MESSAGE_SRC (msg));
gst_message_unref (msg);
msg = NULL;
}
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
}
break;
case GST_MESSAGE_BUFFERING:
handle_buffering_message (urisrc, msg);
msg = NULL;
break;
case GST_MESSAGE_ERROR:
case GST_MESSAGE_WARNING:
if (g_atomic_int_get (&urisrc->flushing)) {
/* Errors/warnings when shutting down are non-critical */
GST_DEBUG_OBJECT (urisrc, "Flushing, ignoring message %" GST_PTR_FORMAT,
msg);
gst_message_unref (msg);
msg = NULL;
}
break;
default:
break;
}
if (msg)
GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
}
/* generic struct passed to all query fold methods
* FIXME, move to core.
*/
typedef struct
{
GstQuery *query;
gint64 min;
gint64 max;
gboolean seekable;
gboolean live;
} QueryFold;
typedef void (*QueryInitFunction) (GstURISourceBin * urisrc, QueryFold * fold);
typedef void (*QueryDoneFunction) (GstURISourceBin * urisrc, QueryFold * fold);
/* for duration/position we collect all durations/positions and take
* the MAX of all valid results */
static void
uri_source_query_init (GstURISourceBin * urisrc, QueryFold * fold)
{
fold->min = 0;
fold->max = -1;
fold->seekable = TRUE;
fold->live = 0;
}
static gboolean
uri_source_query_duration_fold (const GValue * item, GValue * ret,
QueryFold * fold)
{
GstPad *pad = g_value_get_object (item);
if (gst_pad_query (pad, fold->query)) {
gint64 duration;
g_value_set_boolean (ret, TRUE);
gst_query_parse_duration (fold->query, NULL, &duration);
GST_DEBUG_OBJECT (item, "got duration %" G_GINT64_FORMAT, duration);
if (duration > fold->max)
fold->max = duration;
}
return TRUE;
}
static void
uri_source_query_duration_done (GstURISourceBin * urisrc, QueryFold * fold)
{
GstFormat format;
gst_query_parse_duration (fold->query, &format, NULL);
/* store max in query result */
gst_query_set_duration (fold->query, format, fold->max);
GST_DEBUG ("max duration %" G_GINT64_FORMAT, fold->max);
}
static gboolean
uri_source_query_position_fold (const GValue * item, GValue * ret,
QueryFold * fold)
{
GstPad *pad = g_value_get_object (item);
if (gst_pad_query (pad, fold->query)) {
gint64 position;
g_value_set_boolean (ret, TRUE);
gst_query_parse_position (fold->query, NULL, &position);
GST_DEBUG_OBJECT (item, "got position %" G_GINT64_FORMAT, position);
if (position > fold->max)
fold->max = position;
}
return TRUE;
}
static void
uri_source_query_position_done (GstURISourceBin * urisrc, QueryFold * fold)
{
GstFormat format;
gst_query_parse_position (fold->query, &format, NULL);
/* store max in query result */
gst_query_set_position (fold->query, format, fold->max);
GST_DEBUG_OBJECT (urisrc, "max position %" G_GINT64_FORMAT, fold->max);
}
static gboolean
uri_source_query_latency_fold (const GValue * item, GValue * ret,
QueryFold * fold)
{
GstPad *pad = g_value_get_object (item);
if (gst_pad_query (pad, fold->query)) {
GstClockTime min, max;
gboolean live;
gst_query_parse_latency (fold->query, &live, &min, &max);
GST_DEBUG_OBJECT (pad,
"got latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
", live %d", GST_TIME_ARGS (min), GST_TIME_ARGS (max), live);
if (live) {
/* for the combined latency we collect the MAX of all min latencies and
* the MIN of all max latencies */
if (min > fold->min)
fold->min = min;
if (fold->max == -1)
fold->max = max;
else if (max < fold->max)
fold->max = max;
fold->live = TRUE;
}
} else {
GST_LOG_OBJECT (pad, "latency query failed");
g_value_set_boolean (ret, FALSE);
}
return TRUE;
}
static void
uri_source_query_latency_done (GstURISourceBin * urisrc, QueryFold * fold)
{
/* store max in query result */
gst_query_set_latency (fold->query, fold->live, fold->min, fold->max);
GST_DEBUG_OBJECT (urisrc,
"latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
", live %d", GST_TIME_ARGS (fold->min), GST_TIME_ARGS (fold->max),
fold->live);
}
/* we are seekable if all srcpads are seekable */
static gboolean
uri_source_query_seeking_fold (const GValue * item, GValue * ret,
QueryFold * fold)
{
GstPad *pad = g_value_get_object (item);
if (gst_pad_query (pad, fold->query)) {
gboolean seekable;
g_value_set_boolean (ret, TRUE);
gst_query_parse_seeking (fold->query, NULL, &seekable, NULL, NULL);
GST_DEBUG_OBJECT (item, "got seekable %d", seekable);
if (fold->seekable)
fold->seekable = seekable;
}
return TRUE;
}
static void
uri_source_query_seeking_done (GstURISourceBin * urisrc, QueryFold * fold)
{
GstFormat format;
gst_query_parse_seeking (fold->query, &format, NULL, NULL, NULL);
gst_query_set_seeking (fold->query, format, fold->seekable, 0, -1);
GST_DEBUG_OBJECT (urisrc, "seekable %d", fold->seekable);
}
/* generic fold, return first valid result */
static gboolean
uri_source_query_generic_fold (const GValue * item, GValue * ret,
QueryFold * fold)
{
GstPad *pad = g_value_get_object (item);
gboolean res;
if ((res = gst_pad_query (pad, fold->query))) {
g_value_set_boolean (ret, TRUE);
GST_DEBUG_OBJECT (item, "answered query %p", fold->query);
}
/* and stop as soon as we have a valid result */
return !res;
}
/* we're a bin, the default query handler iterates sink elements, which we don't
* have normally. We should just query all source pads.
*/
static gboolean
gst_uri_source_bin_query (GstElement * element, GstQuery * query)
{
GstURISourceBin *urisrc;
gboolean res = FALSE;
GstIterator *iter;
GstIteratorFoldFunction fold_func;
QueryInitFunction fold_init = NULL;
QueryDoneFunction fold_done = NULL;
QueryFold fold_data;
GValue ret = { 0 };
gboolean default_ret = FALSE;
urisrc = GST_URI_SOURCE_BIN (element);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_DURATION:
/* iterate and collect durations */
fold_func = (GstIteratorFoldFunction) uri_source_query_duration_fold;
fold_init = uri_source_query_init;
fold_done = uri_source_query_duration_done;
break;
case GST_QUERY_POSITION:
/* iterate and collect durations */
fold_func = (GstIteratorFoldFunction) uri_source_query_position_fold;
fold_init = uri_source_query_init;
fold_done = uri_source_query_position_done;
break;
case GST_QUERY_LATENCY:
/* iterate and collect durations */
fold_func = (GstIteratorFoldFunction) uri_source_query_latency_fold;
fold_init = uri_source_query_init;
fold_done = uri_source_query_latency_done;
default_ret = TRUE;
break;
case GST_QUERY_SEEKING:
/* iterate and collect durations */
fold_func = (GstIteratorFoldFunction) uri_source_query_seeking_fold;
fold_init = uri_source_query_init;
fold_done = uri_source_query_seeking_done;
break;
case GST_QUERY_SELECTABLE:
/* This is a special case, we need to respond to it potentially before any
* pads are present, so we directly target childsrcpadinfos or the source */
if (urisrc->is_adaptive) {
/* Forward directly to adaptive demuxer */
if (urisrc->src_infos) {
ChildSrcPadInfo *child = urisrc->src_infos->data;
if (child->demuxer) {
return gst_element_query (child->demuxer, query);
}
}
GST_ERROR_OBJECT (urisrc,
"Source is adaptive ... but no adaptive demuxer present ?");
return FALSE;
}
/* FIXME: Theoretically we should fold/try on all chilsrcpadinfo, but they
* might not have exposed any pads yet and as of this implementation there
* are no known SELECTABLE support in other elements, so we fallback on the
* source element */
return gst_element_query (urisrc->source, query);
default:
fold_func = (GstIteratorFoldFunction) uri_source_query_generic_fold;
break;
}
fold_data.query = query;
g_value_init (&ret, G_TYPE_BOOLEAN);
g_value_set_boolean (&ret, default_ret);
iter = gst_element_iterate_src_pads (element);
GST_DEBUG_OBJECT (element, "Sending query %p (type %d) to src pads",
query, GST_QUERY_TYPE (query));
if (fold_init)
fold_init (urisrc, &fold_data);
while (TRUE) {
GstIteratorResult ires;
ires = gst_iterator_fold (iter, fold_func, &ret, &fold_data);
switch (ires) {
case GST_ITERATOR_RESYNC:
gst_iterator_resync (iter);
if (fold_init)
fold_init (urisrc, &fold_data);
g_value_set_boolean (&ret, default_ret);
break;
case GST_ITERATOR_OK:
case GST_ITERATOR_DONE:
res = g_value_get_boolean (&ret);
if (fold_done != NULL && res)
fold_done (urisrc, &fold_data);
goto done;
default:
res = FALSE;
goto done;
}
}
done:
gst_iterator_free (iter);
return res;
}
static GstStateChangeReturn
gst_uri_source_bin_change_state (GstElement * element,
GstStateChange transition)
{
GstStateChangeReturn ret;
GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (element);
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
g_atomic_int_set (&urisrc->flushing, 0);
GST_URI_SOURCE_BIN_LOCK (element);
urisrc->activated = FALSE;
GST_URI_SOURCE_BIN_UNLOCK (element);
GST_DEBUG ("ready to paused");
if (!setup_source (urisrc))
goto source_failed;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
g_atomic_int_set (&urisrc->flushing, 1);
GST_URI_SOURCE_BIN_LOCK (element);
g_cond_broadcast (&urisrc->activation_cond);
GST_URI_SOURCE_BIN_UNLOCK (element);
default:
break;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (ret == GST_STATE_CHANGE_FAILURE)
goto setup_failed;
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
{
GST_URI_SOURCE_BIN_LOCK (element);
GST_DEBUG_OBJECT (urisrc, "Potentially exposing pads");
urisrc->activated = TRUE;
g_cond_broadcast (&urisrc->activation_cond);
GST_URI_SOURCE_BIN_UNLOCK (element);
}
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
GST_DEBUG ("paused to ready");
remove_source (urisrc);
g_list_free_full (urisrc->buffering_status,
(GDestroyNotify) gst_message_unref);
urisrc->buffering_status = NULL;
urisrc->last_buffering_pct = -1;
break;
default:
break;
}
return ret;
/* ERRORS */
source_failed:
{
remove_source (urisrc);
return GST_STATE_CHANGE_FAILURE;
}
setup_failed:
{
if (transition == GST_STATE_CHANGE_READY_TO_PAUSED)
remove_source (urisrc);
return GST_STATE_CHANGE_FAILURE;
}
}