diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c index 928bfef1d4..e64e24b4a5 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c @@ -135,6 +135,7 @@ GST_DEBUG_CATEGORY (adaptivedemux_debug); #define DEFAULT_LOOKBACK_FRAGMENTS 3 #define DEFAULT_CONNECTION_SPEED 0 #define DEFAULT_BITRATE_LIMIT 0.8 +#define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024 /* For safety. Large enough to hold a segment. */ #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock)) #define GST_MANIFEST_LOCK(d) g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); @@ -1062,8 +1063,6 @@ gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad) stream->pad = pad; stream->demux = demux; - stream->fragment_bitrates = - g_malloc0 (sizeof (guint64) * demux->num_lookback_fragments); gst_pad_set_element_private (pad, stream); gst_pad_set_query_function (pad, @@ -1150,8 +1149,6 @@ gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream) g_cond_clear (&stream->fragment_download_cond); g_mutex_clear (&stream->fragment_download_lock); - g_free (stream->fragment_bitrates); - if (stream->pad) { gst_object_unref (stream->pad); stream->pad = NULL; @@ -1620,48 +1617,12 @@ gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream, stream->pending_events = g_list_append (stream->pending_events, event); } -/* must be called with manifest_lock taken */ -static guint64 -_update_average_bitrate (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream, guint64 new_bitrate) -{ - gint index = stream->moving_index % demux->num_lookback_fragments; - - stream->moving_bitrate -= stream->fragment_bitrates[index]; - stream->fragment_bitrates[index] = new_bitrate; - stream->moving_bitrate += new_bitrate; - - stream->moving_index += 1; - - if (stream->moving_index > demux->num_lookback_fragments) - return stream->moving_bitrate / demux->num_lookback_fragments; - return stream->moving_bitrate / stream->moving_index; -} - /* must be called with manifest_lock taken */ static guint64 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream) { - guint64 average_bitrate; - guint64 fragment_bitrate; - - fragment_bitrate = - (stream->fragment_total_size * 8) / - ((double) stream->fragment_total_time / G_GUINT64_CONSTANT (1000000)); - stream->fragment_total_size = 0; - stream->fragment_total_time = 0; - - average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate); - - GST_INFO_OBJECT (stream, "last fragment bitrate was %" G_GUINT64_FORMAT, - fragment_bitrate); - GST_INFO_OBJECT (stream, - "Last %u fragments average bitrate is %" G_GUINT64_FORMAT, - demux->num_lookback_fragments, average_bitrate); - - /* Conservative approach, make sure we don't upgrade too fast */ - stream->current_download_rate = MIN (average_bitrate, fragment_bitrate); + guint64 bitrate; if (demux->connection_speed) { GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it", @@ -1669,11 +1630,12 @@ gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux, return demux->connection_speed; } - stream->current_download_rate *= demux->bitrate_limit; - GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %" - G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate); - - return stream->current_download_rate; + g_object_get (stream->queue, "avg-in-rate", &bitrate, NULL); + bitrate *= 8; + GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps", + bitrate); + stream->current_download_rate = bitrate; + return bitrate; } /* must be called with manifest_lock taken */ @@ -1897,10 +1859,6 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) g_get_monotonic_time () - stream->download_chunk_start_time; stream->download_total_bytes += gst_buffer_get_size (buffer); - stream->fragment_total_size += gst_buffer_get_size (buffer); - stream->fragment_total_time += - g_get_monotonic_time () - stream->download_chunk_start_time; - gst_adapter_push (stream->adapter, buffer); GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT ". Now %" G_GSIZE_FORMAT " on adapter", gst_buffer_get_size (buffer), @@ -2101,6 +2059,14 @@ gst_adaptive_demux_stream_clear_eos_and_flush_state (GstAdaptiveDemuxStream * gst_object_unref (internal_pad); } +static void +gst_adaptive_demux_stream_queue_overrun (GstElement * queue, gpointer user_data) +{ + GstAdaptiveDemuxStream *stream = (GstAdaptiveDemuxStream *) user_data; + GST_WARNING_OBJECT (stream->pad, + "Queue overrun! The fragment to download is too big according to the current queue size limit"); +} + /* must be called with manifest_lock taken */ static gboolean gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream, @@ -2118,7 +2084,7 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream, gchar *old_protocol, *new_protocol; gchar *old_uri; - old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->src)); + old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler)); old_protocol = gst_uri_get_protocol (old_uri); new_protocol = gst_uri_get_protocol (uri); @@ -2133,7 +2099,8 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream, GError *err = NULL; GST_DEBUG_OBJECT (demux, "Re-using old source element"); - if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->src), uri, &err)) { + if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri, + &err)) { GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s", err->message); g_clear_error (&err); @@ -2150,22 +2117,44 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream, } if (stream->src == NULL) { + GstPad *uri_handler_src; + GstPad *queue_sink; + GstPad *queue_src; + GstElement *uri_handler; + GstElement *queue; + GstPadLinkReturn pad_link_ret; GObjectClass *gobject_class; GstPad *internal_pad; - stream->src = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL); - if (stream->src == NULL) { + /* Our src consists of a bin containing uri_handler -> queue2 . The + * purpose of the queue2 is to allow the uri_handler to download an + * entire fragment without blocking, so we can accurately measure the + * download bitrate. */ + + queue = gst_element_factory_make ("queue2", NULL); + if (queue == NULL) + return FALSE; + + g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL); + g_object_set (queue, "max-size-buffers", (guint) 0, NULL); + g_object_set (queue, "max-size-time", (guint64) 0, NULL); + g_signal_connect (queue, "overrun", + G_CALLBACK (gst_adaptive_demux_stream_queue_overrun), stream); + + uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL); + if (uri_handler == NULL) { GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN, ("Missing plugin to handle URI: '%s'", uri), (NULL)); + gst_object_unref (queue); return FALSE; } - gobject_class = G_OBJECT_GET_CLASS (stream->src); + gobject_class = G_OBJECT_GET_CLASS (uri_handler); if (g_object_class_find_property (gobject_class, "compress")) - g_object_set (stream->src, "compress", FALSE, NULL); + g_object_set (uri_handler, "compress", FALSE, NULL); if (g_object_class_find_property (gobject_class, "keep-alive")) - g_object_set (stream->src, "keep-alive", TRUE, NULL); + g_object_set (uri_handler, "keep-alive", TRUE, NULL); if (g_object_class_find_property (gobject_class, "extra-headers")) { if (referer || refresh || !allow_cache) { GstStructure *extra_headers = gst_structure_new_empty ("headers"); @@ -2181,14 +2170,46 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream, gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING, "max-age=0", NULL); - g_object_set (stream->src, "extra-headers", extra_headers, NULL); + g_object_set (uri_handler, "extra-headers", extra_headers, NULL); gst_structure_free (extra_headers); } else { - g_object_set (stream->src, "extra-headers", NULL, NULL); + g_object_set (uri_handler, "extra-headers", NULL, NULL); } } + /* Source bin creation */ + stream->src = gst_bin_new (NULL); + if (stream->src == NULL) { + gst_object_unref (queue); + gst_object_unref (uri_handler); + return FALSE; + } + + gst_bin_add (GST_BIN_CAST (stream->src), queue); + gst_bin_add (GST_BIN_CAST (stream->src), uri_handler); + + uri_handler_src = gst_element_get_static_pad (uri_handler, "src"); + queue_sink = gst_element_get_static_pad (queue, "sink"); + + pad_link_ret = + gst_pad_link_full (uri_handler_src, queue_sink, + GST_PAD_LINK_CHECK_NOTHING); + if (GST_PAD_LINK_FAILED (pad_link_ret)) { + GST_WARNING_OBJECT (demux, + "Could not link pads %s:%s to %s:%s for reason %d", + GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink), + pad_link_ret); + gst_object_unref (stream->src); + stream->src = NULL; + return FALSE; + } + + queue_src = gst_element_get_static_pad (queue, "src"); + stream->src_srcpad = gst_ghost_pad_new ("src", queue_src); + g_object_unref (queue_src); + gst_element_add_pad (stream->src, stream->src_srcpad); + gst_element_set_locked_state (stream->src, TRUE); gst_bin_add (GST_BIN_CAST (demux), stream->src); stream->src_srcpad = gst_element_get_static_pad (stream->src, "src"); @@ -2207,6 +2228,8 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream, /* need to set query otherwise deadlocks happen with allocation queries */ gst_pad_set_query_function (GST_PAD_CAST (internal_pad), _src_query); gst_object_unref (internal_pad); + stream->uri_handler = uri_handler; + stream->queue = queue; } return TRUE; } @@ -2235,7 +2258,9 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux, * stop position */ if (end != -1) end += 1; - if (!gst_element_send_event (stream->src, gst_event_new_seek (1.0, + /* Send the seek event to the uri_handler, as the other pipeline elements + * can't handle it when READY. */ + if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0, GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) { @@ -2985,10 +3010,6 @@ gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux, stream->download_error_count = 0; g_clear_error (&stream->last_error); - stream->download_total_time += - g_get_monotonic_time () - stream->download_chunk_start_time; - stream->fragment_total_time += - g_get_monotonic_time () - stream->download_chunk_start_time; /* FIXME - url has no indication of byte ranges for subsegments */ gst_element_post_message (GST_ELEMENT_CAST (demux), diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h index 72d78fe54a..bfc42a0226 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h @@ -138,6 +138,8 @@ struct _GstAdaptiveDemuxStream /* download tooling */ GstElement *src; GstPad *src_srcpad; + GstElement *uri_handler; + GstElement *queue; GMutex fragment_download_lock; GCond fragment_download_cond; gboolean download_finished; /* protected by fragment_download_lock */ @@ -150,15 +152,6 @@ struct _GstAdaptiveDemuxStream gint64 download_total_bytes; guint64 current_download_rate; - /* Per fragment download information */ - guint64 fragment_total_time; - guint64 fragment_total_size; - - /* Average for the last fragments */ - guint64 moving_bitrate; - guint moving_index; - guint64 *fragment_bitrates; - GstAdaptiveDemuxStreamFragment fragment; guint download_error_count;