adaptivedemux: improved bitrate estimations

Bitrate estimation is now handled through a queue2 element added after
the source element used to download fragments.

Original hlsdemux patch by Duncan Palmer <dpalmer@digisoft.tv>
https://bugzilla.gnome.org/show_bug.cgi?id=733959
This commit is contained in:
Philippe Normand 2015-09-04 09:59:06 +02:00 committed by Thiago Santos
parent 8a78e788b0
commit ccff3be3ab
2 changed files with 87 additions and 73 deletions

View file

@ -135,6 +135,7 @@ GST_DEBUG_CATEGORY (adaptivedemux_debug);
#define DEFAULT_LOOKBACK_FRAGMENTS 3 #define DEFAULT_LOOKBACK_FRAGMENTS 3
#define DEFAULT_CONNECTION_SPEED 0 #define DEFAULT_CONNECTION_SPEED 0
#define DEFAULT_BITRATE_LIMIT 0.8 #define DEFAULT_BITRATE_LIMIT 0.8
#define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024 /* For safety. Large enough to hold a segment. */
#define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock)) #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
#define GST_MANIFEST_LOCK(d) g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); #define GST_MANIFEST_LOCK(d) g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d));
@ -1062,8 +1063,6 @@ gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
stream->pad = pad; stream->pad = pad;
stream->demux = demux; stream->demux = demux;
stream->fragment_bitrates =
g_malloc0 (sizeof (guint64) * demux->num_lookback_fragments);
gst_pad_set_element_private (pad, stream); gst_pad_set_element_private (pad, stream);
gst_pad_set_query_function (pad, gst_pad_set_query_function (pad,
@ -1150,8 +1149,6 @@ gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
g_cond_clear (&stream->fragment_download_cond); g_cond_clear (&stream->fragment_download_cond);
g_mutex_clear (&stream->fragment_download_lock); g_mutex_clear (&stream->fragment_download_lock);
g_free (stream->fragment_bitrates);
if (stream->pad) { if (stream->pad) {
gst_object_unref (stream->pad); gst_object_unref (stream->pad);
stream->pad = NULL; stream->pad = NULL;
@ -1620,48 +1617,12 @@ gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
stream->pending_events = g_list_append (stream->pending_events, event); stream->pending_events = g_list_append (stream->pending_events, event);
} }
/* must be called with manifest_lock taken */
static guint64
_update_average_bitrate (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
{
gint index = stream->moving_index % demux->num_lookback_fragments;
stream->moving_bitrate -= stream->fragment_bitrates[index];
stream->fragment_bitrates[index] = new_bitrate;
stream->moving_bitrate += new_bitrate;
stream->moving_index += 1;
if (stream->moving_index > demux->num_lookback_fragments)
return stream->moving_bitrate / demux->num_lookback_fragments;
return stream->moving_bitrate / stream->moving_index;
}
/* must be called with manifest_lock taken */ /* must be called with manifest_lock taken */
static guint64 static guint64
gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux, gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream) GstAdaptiveDemuxStream * stream)
{ {
guint64 average_bitrate; guint64 bitrate;
guint64 fragment_bitrate;
fragment_bitrate =
(stream->fragment_total_size * 8) /
((double) stream->fragment_total_time / G_GUINT64_CONSTANT (1000000));
stream->fragment_total_size = 0;
stream->fragment_total_time = 0;
average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
GST_INFO_OBJECT (stream, "last fragment bitrate was %" G_GUINT64_FORMAT,
fragment_bitrate);
GST_INFO_OBJECT (stream,
"Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
demux->num_lookback_fragments, average_bitrate);
/* Conservative approach, make sure we don't upgrade too fast */
stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
if (demux->connection_speed) { if (demux->connection_speed) {
GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it", GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
@ -1669,11 +1630,12 @@ gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
return demux->connection_speed; return demux->connection_speed;
} }
stream->current_download_rate *= demux->bitrate_limit; g_object_get (stream->queue, "avg-in-rate", &bitrate, NULL);
GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %" bitrate *= 8;
G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate); GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
bitrate);
return stream->current_download_rate; stream->current_download_rate = bitrate;
return bitrate;
} }
/* must be called with manifest_lock taken */ /* must be called with manifest_lock taken */
@ -1897,10 +1859,6 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
g_get_monotonic_time () - stream->download_chunk_start_time; g_get_monotonic_time () - stream->download_chunk_start_time;
stream->download_total_bytes += gst_buffer_get_size (buffer); stream->download_total_bytes += gst_buffer_get_size (buffer);
stream->fragment_total_size += gst_buffer_get_size (buffer);
stream->fragment_total_time +=
g_get_monotonic_time () - stream->download_chunk_start_time;
gst_adapter_push (stream->adapter, buffer); gst_adapter_push (stream->adapter, buffer);
GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT
". Now %" G_GSIZE_FORMAT " on adapter", gst_buffer_get_size (buffer), ". Now %" G_GSIZE_FORMAT " on adapter", gst_buffer_get_size (buffer),
@ -2101,6 +2059,14 @@ gst_adaptive_demux_stream_clear_eos_and_flush_state (GstAdaptiveDemuxStream *
gst_object_unref (internal_pad); gst_object_unref (internal_pad);
} }
static void
gst_adaptive_demux_stream_queue_overrun (GstElement * queue, gpointer user_data)
{
GstAdaptiveDemuxStream *stream = (GstAdaptiveDemuxStream *) user_data;
GST_WARNING_OBJECT (stream->pad,
"Queue overrun! The fragment to download is too big according to the current queue size limit");
}
/* must be called with manifest_lock taken */ /* must be called with manifest_lock taken */
static gboolean static gboolean
gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream, gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
@ -2118,7 +2084,7 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
gchar *old_protocol, *new_protocol; gchar *old_protocol, *new_protocol;
gchar *old_uri; gchar *old_uri;
old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->src)); old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
old_protocol = gst_uri_get_protocol (old_uri); old_protocol = gst_uri_get_protocol (old_uri);
new_protocol = gst_uri_get_protocol (uri); new_protocol = gst_uri_get_protocol (uri);
@ -2133,7 +2099,8 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
GError *err = NULL; GError *err = NULL;
GST_DEBUG_OBJECT (demux, "Re-using old source element"); GST_DEBUG_OBJECT (demux, "Re-using old source element");
if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->src), uri, &err)) { if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
&err)) {
GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s", GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
err->message); err->message);
g_clear_error (&err); g_clear_error (&err);
@ -2150,22 +2117,44 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
} }
if (stream->src == NULL) { if (stream->src == NULL) {
GstPad *uri_handler_src;
GstPad *queue_sink;
GstPad *queue_src;
GstElement *uri_handler;
GstElement *queue;
GstPadLinkReturn pad_link_ret;
GObjectClass *gobject_class; GObjectClass *gobject_class;
GstPad *internal_pad; GstPad *internal_pad;
stream->src = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL); /* Our src consists of a bin containing uri_handler -> queue2 . The
if (stream->src == NULL) { * purpose of the queue2 is to allow the uri_handler to download an
* entire fragment without blocking, so we can accurately measure the
* download bitrate. */
queue = gst_element_factory_make ("queue2", NULL);
if (queue == NULL)
return FALSE;
g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
g_object_set (queue, "max-size-time", (guint64) 0, NULL);
g_signal_connect (queue, "overrun",
G_CALLBACK (gst_adaptive_demux_stream_queue_overrun), stream);
uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
if (uri_handler == NULL) {
GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN, GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
("Missing plugin to handle URI: '%s'", uri), (NULL)); ("Missing plugin to handle URI: '%s'", uri), (NULL));
gst_object_unref (queue);
return FALSE; return FALSE;
} }
gobject_class = G_OBJECT_GET_CLASS (stream->src); gobject_class = G_OBJECT_GET_CLASS (uri_handler);
if (g_object_class_find_property (gobject_class, "compress")) if (g_object_class_find_property (gobject_class, "compress"))
g_object_set (stream->src, "compress", FALSE, NULL); g_object_set (uri_handler, "compress", FALSE, NULL);
if (g_object_class_find_property (gobject_class, "keep-alive")) if (g_object_class_find_property (gobject_class, "keep-alive"))
g_object_set (stream->src, "keep-alive", TRUE, NULL); g_object_set (uri_handler, "keep-alive", TRUE, NULL);
if (g_object_class_find_property (gobject_class, "extra-headers")) { if (g_object_class_find_property (gobject_class, "extra-headers")) {
if (referer || refresh || !allow_cache) { if (referer || refresh || !allow_cache) {
GstStructure *extra_headers = gst_structure_new_empty ("headers"); GstStructure *extra_headers = gst_structure_new_empty ("headers");
@ -2181,14 +2170,46 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING, gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
"max-age=0", NULL); "max-age=0", NULL);
g_object_set (stream->src, "extra-headers", extra_headers, NULL); g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
gst_structure_free (extra_headers); gst_structure_free (extra_headers);
} else { } else {
g_object_set (stream->src, "extra-headers", NULL, NULL); g_object_set (uri_handler, "extra-headers", NULL, NULL);
} }
} }
/* Source bin creation */
stream->src = gst_bin_new (NULL);
if (stream->src == NULL) {
gst_object_unref (queue);
gst_object_unref (uri_handler);
return FALSE;
}
gst_bin_add (GST_BIN_CAST (stream->src), queue);
gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
queue_sink = gst_element_get_static_pad (queue, "sink");
pad_link_ret =
gst_pad_link_full (uri_handler_src, queue_sink,
GST_PAD_LINK_CHECK_NOTHING);
if (GST_PAD_LINK_FAILED (pad_link_ret)) {
GST_WARNING_OBJECT (demux,
"Could not link pads %s:%s to %s:%s for reason %d",
GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
pad_link_ret);
gst_object_unref (stream->src);
stream->src = NULL;
return FALSE;
}
queue_src = gst_element_get_static_pad (queue, "src");
stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
g_object_unref (queue_src);
gst_element_add_pad (stream->src, stream->src_srcpad);
gst_element_set_locked_state (stream->src, TRUE); gst_element_set_locked_state (stream->src, TRUE);
gst_bin_add (GST_BIN_CAST (demux), stream->src); gst_bin_add (GST_BIN_CAST (demux), stream->src);
stream->src_srcpad = gst_element_get_static_pad (stream->src, "src"); stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
@ -2207,6 +2228,8 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
/* need to set query otherwise deadlocks happen with allocation queries */ /* need to set query otherwise deadlocks happen with allocation queries */
gst_pad_set_query_function (GST_PAD_CAST (internal_pad), _src_query); gst_pad_set_query_function (GST_PAD_CAST (internal_pad), _src_query);
gst_object_unref (internal_pad); gst_object_unref (internal_pad);
stream->uri_handler = uri_handler;
stream->queue = queue;
} }
return TRUE; return TRUE;
} }
@ -2235,7 +2258,9 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
* stop position */ * stop position */
if (end != -1) if (end != -1)
end += 1; end += 1;
if (!gst_element_send_event (stream->src, gst_event_new_seek (1.0, /* Send the seek event to the uri_handler, as the other pipeline elements
* can't handle it when READY. */
if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH, GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) { GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
@ -2985,10 +3010,6 @@ gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
stream->download_error_count = 0; stream->download_error_count = 0;
g_clear_error (&stream->last_error); g_clear_error (&stream->last_error);
stream->download_total_time +=
g_get_monotonic_time () - stream->download_chunk_start_time;
stream->fragment_total_time +=
g_get_monotonic_time () - stream->download_chunk_start_time;
/* FIXME - url has no indication of byte ranges for subsegments */ /* FIXME - url has no indication of byte ranges for subsegments */
gst_element_post_message (GST_ELEMENT_CAST (demux), gst_element_post_message (GST_ELEMENT_CAST (demux),

View file

@ -138,6 +138,8 @@ struct _GstAdaptiveDemuxStream
/* download tooling */ /* download tooling */
GstElement *src; GstElement *src;
GstPad *src_srcpad; GstPad *src_srcpad;
GstElement *uri_handler;
GstElement *queue;
GMutex fragment_download_lock; GMutex fragment_download_lock;
GCond fragment_download_cond; GCond fragment_download_cond;
gboolean download_finished; /* protected by fragment_download_lock */ gboolean download_finished; /* protected by fragment_download_lock */
@ -150,15 +152,6 @@ struct _GstAdaptiveDemuxStream
gint64 download_total_bytes; gint64 download_total_bytes;
guint64 current_download_rate; guint64 current_download_rate;
/* Per fragment download information */
guint64 fragment_total_time;
guint64 fragment_total_size;
/* Average for the last fragments */
guint64 moving_bitrate;
guint moving_index;
guint64 *fragment_bitrates;
GstAdaptiveDemuxStreamFragment fragment; GstAdaptiveDemuxStreamFragment fragment;
guint download_error_count; guint download_error_count;