mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-19 05:45:58 +00:00
dashdemux: remove uridownloader from fragments download
Instead, use a source element linked to a ghostpad to provide smaller buffers and more granular control for downstream buffering elements while also reducing startup latency
This commit is contained in:
parent
6f4fd70867
commit
bed3d66605
2 changed files with 379 additions and 161 deletions
|
@ -218,6 +218,8 @@ static GstFlowReturn gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream
|
|||
static gboolean gst_dash_demux_advance_period (GstDashDemux * demux);
|
||||
static void gst_dash_demux_download_wait (GstDashDemuxStream * stream,
|
||||
GstClockTime time_diff);
|
||||
static void gst_dash_demux_stream_download_uri (GstDashDemux * demux,
|
||||
GstDashDemuxStream * stream, const gchar * uri, gint64 start, gint64 end);
|
||||
|
||||
static void gst_dash_demux_expose_streams (GstDashDemux * demux);
|
||||
static void gst_dash_demux_remove_streams (GstDashDemux * demux,
|
||||
|
@ -226,10 +228,11 @@ static void gst_dash_demux_stream_free (GstDashDemuxStream * stream);
|
|||
static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
|
||||
static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
|
||||
GstActiveStream * stream);
|
||||
static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux);
|
||||
static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux,
|
||||
GstDashDemuxStream * stream);
|
||||
|
||||
#define gst_dash_demux_parent_class parent_class
|
||||
G_DEFINE_TYPE_WITH_CODE (GstDashDemux, gst_dash_demux, GST_TYPE_ELEMENT,
|
||||
G_DEFINE_TYPE_WITH_CODE (GstDashDemux, gst_dash_demux, GST_TYPE_BIN,
|
||||
GST_DEBUG_CATEGORY_INIT (gst_dash_demux_debug, "dashdemux", 0,
|
||||
"dashdemux element");
|
||||
);
|
||||
|
@ -663,7 +666,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
|
|||
|
||||
GST_LOG_OBJECT (demux, "Creating stream %d %" GST_PTR_FORMAT, i, caps);
|
||||
streams = g_slist_prepend (streams, stream);
|
||||
stream->pad = gst_dash_demux_create_pad (demux);
|
||||
stream->pad = gst_dash_demux_create_pad (demux, stream);
|
||||
|
||||
stream_id =
|
||||
gst_pad_create_stream_id_printf (stream->pad,
|
||||
|
@ -958,6 +961,7 @@ gst_dash_demux_wait_stop (GstDashDemux * demux)
|
|||
GstDashDemuxStream *stream = iter->data;
|
||||
|
||||
gst_task_join (stream->download_task);
|
||||
gst_element_set_state (stream->src, GST_STATE_NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -979,22 +983,30 @@ gst_dash_demux_stop (GstDashDemux * demux)
|
|||
stream->need_header = TRUE;
|
||||
gst_task_stop (stream->download_task);
|
||||
GST_TASK_SIGNAL (stream->download_task);
|
||||
gst_element_set_state (stream->src, GST_STATE_READY);
|
||||
g_cond_signal (&stream->fragment_download_cond);
|
||||
gst_uri_downloader_cancel (stream->downloader);
|
||||
}
|
||||
}
|
||||
|
||||
static GstPad *
|
||||
gst_dash_demux_create_pad (GstDashDemux * demux)
|
||||
gst_dash_demux_create_pad (GstDashDemux * demux, GstDashDemuxStream * stream)
|
||||
{
|
||||
GstPad *pad;
|
||||
GstPadTemplate *tmpl;
|
||||
|
||||
tmpl = gst_static_pad_template_get (&srctemplate);
|
||||
|
||||
/* Create and activate new pads */
|
||||
pad = gst_pad_new_from_static_template (&srctemplate, NULL);
|
||||
pad = gst_ghost_pad_new_no_target_from_template (NULL, tmpl);
|
||||
gst_object_unref (tmpl);
|
||||
|
||||
gst_pad_set_event_function (pad,
|
||||
GST_DEBUG_FUNCPTR (gst_dash_demux_src_event));
|
||||
gst_pad_set_query_function (pad,
|
||||
GST_DEBUG_FUNCPTR (gst_dash_demux_src_query));
|
||||
gst_pad_set_element_private (pad, demux);
|
||||
gst_pad_set_element_private (pad, stream);
|
||||
|
||||
gst_pad_set_active (pad, TRUE);
|
||||
GST_INFO_OBJECT (demux, "Creating srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
|
||||
return pad;
|
||||
|
@ -1081,50 +1093,6 @@ gst_dash_demux_advance_period (GstDashDemux * demux)
|
|||
return TRUE;
|
||||
}
|
||||
|
||||
static GstFlowReturn
|
||||
gst_dash_demux_push (GstDashDemuxStream * stream, GstBuffer * buffer)
|
||||
{
|
||||
GstDashDemux *demux = stream->demux;
|
||||
GstClockTime timestamp, duration;
|
||||
GstFlowReturn ret = GST_FLOW_OK;
|
||||
|
||||
timestamp = GST_BUFFER_TIMESTAMP (buffer);
|
||||
|
||||
if (stream->pending_segment) {
|
||||
if (demux->timestamp_offset == -1)
|
||||
demux->timestamp_offset = timestamp;
|
||||
else
|
||||
demux->timestamp_offset = MIN (timestamp, demux->timestamp_offset);
|
||||
|
||||
/* And send a newsegment */
|
||||
gst_pad_push_event (stream->pad, stream->pending_segment);
|
||||
stream->pending_segment = NULL;
|
||||
}
|
||||
|
||||
/* make timestamp start from 0 by subtracting the offset */
|
||||
timestamp -= demux->timestamp_offset;
|
||||
duration = GST_BUFFER_DURATION (buffer);
|
||||
|
||||
GST_BUFFER_TIMESTAMP (buffer) = timestamp;
|
||||
|
||||
GST_DEBUG_OBJECT (stream->pad,
|
||||
"Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%"
|
||||
GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT, buffer,
|
||||
GST_BUFFER_OFFSET (buffer), stream->index,
|
||||
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
|
||||
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
|
||||
ret = gst_pad_push (stream->pad, gst_buffer_ref (buffer));
|
||||
GST_DEBUG_OBJECT (stream->pad, "Push result: %d %s", ret,
|
||||
gst_flow_get_name (ret));
|
||||
|
||||
demux->segment.position = timestamp;
|
||||
stream->position = timestamp;
|
||||
if (GST_CLOCK_TIME_IS_VALID (duration))
|
||||
stream->position += duration;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
gst_dash_demux_stream_free (GstDashDemuxStream * stream)
|
||||
{
|
||||
|
@ -1148,6 +1116,20 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream)
|
|||
g_cond_clear (&stream->download_cond);
|
||||
g_mutex_clear (&stream->download_mutex);
|
||||
|
||||
if (stream->src_srcpad) {
|
||||
gst_object_unref (stream->src_srcpad);
|
||||
stream->src_srcpad = NULL;
|
||||
}
|
||||
|
||||
if (stream->src) {
|
||||
gst_element_set_state (stream->src, GST_STATE_NULL);
|
||||
gst_bin_remove (GST_BIN_CAST (stream->demux), stream->src);
|
||||
stream->src = NULL;
|
||||
}
|
||||
|
||||
g_cond_clear (&stream->fragment_download_cond);
|
||||
g_mutex_clear (&stream->fragment_download_lock);
|
||||
|
||||
if (stream->downloader != NULL) {
|
||||
g_object_unref (stream->downloader);
|
||||
}
|
||||
|
@ -1655,14 +1637,12 @@ gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static GstBuffer *
|
||||
static void
|
||||
gst_dash_demux_download_header_fragment (GstDashDemux * demux,
|
||||
GstDashDemuxStream * stream, gchar * path, gint64 range_start,
|
||||
gint64 range_end)
|
||||
{
|
||||
GstBuffer *buffer = NULL;
|
||||
gchar *next_header_uri;
|
||||
GstFragment *fragment;
|
||||
|
||||
if (strncmp (path, "http://", 7) != 0) {
|
||||
next_header_uri =
|
||||
|
@ -1673,56 +1653,37 @@ gst_dash_demux_download_header_fragment (GstDashDemux * demux,
|
|||
next_header_uri = path;
|
||||
}
|
||||
|
||||
fragment = gst_uri_downloader_fetch_uri_with_range (stream->downloader,
|
||||
next_header_uri, demux->client->mpd_uri, FALSE, FALSE, TRUE, range_start,
|
||||
range_end, NULL);
|
||||
gst_dash_demux_stream_download_uri (demux, stream, next_header_uri,
|
||||
range_start, range_end);
|
||||
g_free (next_header_uri);
|
||||
if (fragment) {
|
||||
buffer = gst_fragment_get_buffer (fragment);
|
||||
g_object_unref (fragment);
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
|
||||
static GstBuffer *
|
||||
static void
|
||||
gst_dash_demux_get_next_header (GstDashDemux * demux,
|
||||
GstDashDemuxStream * stream)
|
||||
{
|
||||
gchar *initializationURL;
|
||||
GstBuffer *header_buffer, *index_buffer = NULL;
|
||||
gint64 range_start, range_end;
|
||||
|
||||
if (!gst_mpd_client_get_next_header (demux->client, &initializationURL,
|
||||
stream->index, &range_start, &range_end))
|
||||
return NULL;
|
||||
return;
|
||||
|
||||
GST_INFO_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
|
||||
G_GINT64_FORMAT, initializationURL, range_start, range_end);
|
||||
header_buffer = gst_dash_demux_download_header_fragment (demux, stream,
|
||||
gst_dash_demux_download_header_fragment (demux, stream,
|
||||
initializationURL, range_start, range_end);
|
||||
|
||||
/* check if we have an index */
|
||||
if (header_buffer
|
||||
if (!demux->cancelled && stream->last_ret == GST_FLOW_OK /* TODO check for other valid types */
|
||||
&& gst_mpd_client_get_next_header_index (demux->client,
|
||||
&initializationURL, stream->index, &range_start, &range_end)) {
|
||||
GST_INFO_OBJECT (demux,
|
||||
"Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
|
||||
initializationURL, range_start, range_end);
|
||||
index_buffer =
|
||||
gst_dash_demux_download_header_fragment (demux, stream,
|
||||
gst_dash_demux_download_header_fragment (demux, stream,
|
||||
initializationURL, range_start, range_end);
|
||||
}
|
||||
|
||||
if (header_buffer == NULL) {
|
||||
GST_WARNING_OBJECT (demux, "Unable to fetch header");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (index_buffer) {
|
||||
header_buffer = gst_buffer_append (header_buffer, index_buffer);
|
||||
}
|
||||
|
||||
return header_buffer;
|
||||
}
|
||||
|
||||
static GstCaps *
|
||||
|
@ -1847,19 +1808,283 @@ gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux,
|
|||
}
|
||||
}
|
||||
|
||||
static GstBuffer *
|
||||
static GstFlowReturn
|
||||
_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
|
||||
{
|
||||
GstPad *srcpad = (GstPad *) parent;
|
||||
GstDashDemux *demux = (GstDashDemux *) GST_PAD_PARENT (srcpad);
|
||||
GstDashDemuxStream *stream = gst_pad_get_element_private (srcpad);
|
||||
GstFlowReturn ret;
|
||||
gboolean discont = FALSE;
|
||||
|
||||
if (stream->starting_fragment) {
|
||||
if (demux->segment.rate < 0)
|
||||
/* Set DISCONT flag for every first buffer in reverse playback mode
|
||||
* as each fragment for its own has to be reversed */
|
||||
discont = TRUE;
|
||||
stream->starting_fragment = FALSE;
|
||||
|
||||
GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
|
||||
GST_TIME_ARGS (stream->current_fragment.timestamp));
|
||||
|
||||
GST_BUFFER_PTS (buffer) = stream->current_fragment.timestamp;
|
||||
} else {
|
||||
GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
|
||||
}
|
||||
|
||||
if (discont) {
|
||||
GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
|
||||
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
|
||||
} else {
|
||||
GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
|
||||
}
|
||||
|
||||
|
||||
GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
|
||||
GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
|
||||
GST_BUFFER_OFFSET (buffer) =
|
||||
gst_mpd_client_get_segment_index (stream->active_stream) - 1;
|
||||
|
||||
if (stream->pending_segment) {
|
||||
if (demux->timestamp_offset == -1)
|
||||
demux->timestamp_offset = GST_BUFFER_PTS (buffer);
|
||||
else
|
||||
demux->timestamp_offset =
|
||||
MIN (GST_BUFFER_PTS (buffer), demux->timestamp_offset);
|
||||
|
||||
/* And send a newsegment */
|
||||
gst_pad_push_event (stream->pad, stream->pending_segment);
|
||||
stream->pending_segment = NULL;
|
||||
}
|
||||
|
||||
/* make timestamp start from 0 by subtracting the offset */
|
||||
GST_BUFFER_PTS (buffer) -= demux->timestamp_offset;
|
||||
|
||||
stream->position = demux->segment.position = GST_BUFFER_PTS (buffer);
|
||||
|
||||
/* accumulate time and size to get this chunk */
|
||||
stream->download_total_time +=
|
||||
g_get_monotonic_time () - stream->download_start_time;
|
||||
stream->download_total_bytes += gst_buffer_get_size (buffer);
|
||||
|
||||
ret = gst_proxy_pad_chain_default (pad, parent, buffer);
|
||||
stream->download_start_time = g_get_monotonic_time ();
|
||||
GST_LOG_OBJECT (pad, "Chain res: %d %s", ret, gst_flow_get_name (ret));
|
||||
|
||||
if (ret != GST_FLOW_OK) {
|
||||
if (ret < GST_FLOW_EOS) {
|
||||
GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
|
||||
("stream stopped, reason %s", gst_flow_get_name (ret)));
|
||||
|
||||
/* TODO push this on all pads */
|
||||
gst_pad_push_event (stream->pad, gst_event_new_eos ());
|
||||
} else {
|
||||
GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
|
||||
gst_flow_get_name (ret));
|
||||
}
|
||||
|
||||
/* TODO properly stop tasks */
|
||||
/* gst_hls_demux_pause_tasks (demux); */
|
||||
}
|
||||
|
||||
/* avoid having the source handle the same error again */
|
||||
stream->last_ret = ret;
|
||||
ret = GST_FLOW_OK;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
|
||||
{
|
||||
GstPad *srcpad = GST_PAD_CAST (parent);
|
||||
GstDashDemuxStream *stream = gst_pad_get_element_private (srcpad);
|
||||
|
||||
switch (GST_EVENT_TYPE (event)) {
|
||||
case GST_EVENT_EOS:
|
||||
g_cond_signal (&stream->fragment_download_cond);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
gst_event_unref (event);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
|
||||
{
|
||||
switch (GST_QUERY_TYPE (query)) {
|
||||
case GST_QUERY_ALLOCATION:
|
||||
return FALSE;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return gst_pad_query_default (pad, parent, query);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_dash_demux_stream_update_source (GstDashDemuxStream * stream,
|
||||
const gchar * uri, const gchar * referer, gboolean refresh,
|
||||
gboolean allow_cache)
|
||||
{
|
||||
GstDashDemux *demux = stream->demux;
|
||||
|
||||
if (!gst_uri_is_valid (uri))
|
||||
return FALSE;
|
||||
|
||||
if (stream->src != NULL) {
|
||||
gchar *old_protocol, *new_protocol;
|
||||
gchar *old_uri;
|
||||
|
||||
old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->src));
|
||||
old_protocol = gst_uri_get_protocol (old_uri);
|
||||
new_protocol = gst_uri_get_protocol (uri);
|
||||
|
||||
if (!g_str_equal (old_protocol, new_protocol)) {
|
||||
gst_object_unref (stream->src_srcpad);
|
||||
gst_element_set_state (stream->src, GST_STATE_NULL);
|
||||
gst_bin_remove (GST_BIN_CAST (demux), stream->src);
|
||||
stream->src = NULL;
|
||||
stream->src_srcpad = NULL;
|
||||
GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
|
||||
} else {
|
||||
GError *err = NULL;
|
||||
|
||||
GST_DEBUG_OBJECT (demux, "Re-using old source element");
|
||||
if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->src), uri, &err)) {
|
||||
GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
|
||||
err->message);
|
||||
g_clear_error (&err);
|
||||
gst_element_set_state (stream->src, GST_STATE_NULL);
|
||||
gst_bin_remove (GST_BIN_CAST (demux), stream->src);
|
||||
stream->src = NULL;
|
||||
}
|
||||
}
|
||||
g_free (old_uri);
|
||||
g_free (old_protocol);
|
||||
g_free (new_protocol);
|
||||
}
|
||||
|
||||
if (stream->src == NULL) {
|
||||
GObjectClass *gobject_class;
|
||||
GstPad *internal_pad;
|
||||
|
||||
stream->src = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
|
||||
if (stream->src == NULL) {
|
||||
GST_WARNING_OBJECT (demux, "No element to handle uri: %s", uri);
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
gobject_class = G_OBJECT_GET_CLASS (stream->src);
|
||||
|
||||
if (g_object_class_find_property (gobject_class, "compress"))
|
||||
g_object_set (stream->src, "compress", FALSE, NULL);
|
||||
if (g_object_class_find_property (gobject_class, "keep-alive"))
|
||||
g_object_set (stream->src, "keep-alive", TRUE, NULL);
|
||||
if (g_object_class_find_property (gobject_class, "extra-headers")) {
|
||||
if (referer || refresh || !allow_cache) {
|
||||
GstStructure *extra_headers = gst_structure_new_empty ("headers");
|
||||
|
||||
if (referer)
|
||||
gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
|
||||
NULL);
|
||||
|
||||
if (!allow_cache)
|
||||
gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
|
||||
"no-cache", NULL);
|
||||
else if (refresh)
|
||||
gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
|
||||
"max-age=0", NULL);
|
||||
|
||||
g_object_set (stream->src, "extra-headers", extra_headers, NULL);
|
||||
|
||||
gst_structure_free (extra_headers);
|
||||
} else {
|
||||
g_object_set (stream->src, "extra-headers", NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
gst_element_set_locked_state (stream->src, TRUE);
|
||||
gst_bin_add (GST_BIN_CAST (demux), stream->src);
|
||||
stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
|
||||
|
||||
gst_ghost_pad_set_target (GST_GHOST_PAD_CAST (stream->pad),
|
||||
stream->src_srcpad);
|
||||
|
||||
/* set up our internal pad to drop all events from
|
||||
* the http src we don't care about. On the chain function
|
||||
* we just push the buffer forward, but this way dash can get
|
||||
* the flow return from downstream */
|
||||
internal_pad =
|
||||
GST_PAD_CAST (gst_proxy_pad_get_internal (GST_PROXY_PAD (stream->pad)));
|
||||
gst_pad_set_chain_function (GST_PAD_CAST (internal_pad), _src_chain);
|
||||
gst_pad_set_event_function (GST_PAD_CAST (internal_pad), _src_event);
|
||||
/* need to set query otherwise deadlocks happen with allocation queries */
|
||||
gst_pad_set_query_function (GST_PAD_CAST (internal_pad), _src_query);
|
||||
gst_object_unref (internal_pad);
|
||||
}
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
/* must be called with the stream's fragment_download_lock */
|
||||
static void
|
||||
gst_dash_demux_stream_download_uri (GstDashDemux * demux,
|
||||
GstDashDemuxStream * stream, const gchar * uri, gint64 start, gint64 end)
|
||||
{
|
||||
GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT
|
||||
" - %" G_GINT64_FORMAT, uri, start, end);
|
||||
|
||||
/* TODO check return */
|
||||
gst_dash_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE);
|
||||
if (gst_element_set_state (stream->src,
|
||||
GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
|
||||
if (start != 0 || end != -1) {
|
||||
if (!gst_element_send_event (stream->src, gst_event_new_seek (1.0,
|
||||
GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
|
||||
GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
|
||||
|
||||
/* looks like the source can't handle seeks in READY */
|
||||
/*
|
||||
*err = g_error_new (GST_CORE_ERROR, GST_CORE_ERROR_NOT_IMPLEMENTED,
|
||||
"Source element can't handle range requests");
|
||||
*/
|
||||
stream->last_ret = GST_FLOW_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
|
||||
/* flush the proxypads so that the EOS state is reset */
|
||||
gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_start ());
|
||||
gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_stop (TRUE));
|
||||
|
||||
stream->download_start_time = g_get_monotonic_time ();
|
||||
gst_element_sync_state_with_parent (stream->src);
|
||||
|
||||
/* wait for the fragment to be completely downloaded */
|
||||
GST_DEBUG_OBJECT (stream->pad,
|
||||
"Waiting for fragment download to finish: %s", uri);
|
||||
g_cond_wait (&stream->fragment_download_cond,
|
||||
&stream->fragment_download_lock);
|
||||
}
|
||||
} else {
|
||||
stream->last_ret = GST_FLOW_CUSTOM_ERROR;
|
||||
}
|
||||
|
||||
gst_element_set_state (stream->src, GST_STATE_READY);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
|
||||
GstDashDemuxStream * stream, guint64 * size_buffer,
|
||||
GstClockTime * download_time)
|
||||
GstDashDemuxStream * stream)
|
||||
{
|
||||
GstActiveStream *active_stream;
|
||||
GstFragment *download;
|
||||
GTimeVal now;
|
||||
GTimeVal start;
|
||||
guint stream_idx = stream->index;
|
||||
GstBuffer *buffer = NULL;
|
||||
GstBuffer *header_buffer;
|
||||
GstMediaFragmentInfo fragment;
|
||||
GstMediaFragmentInfo *fragment = &stream->current_fragment;
|
||||
|
||||
if (G_UNLIKELY (stream->restart_download)) {
|
||||
GstClockTime cur, ts;
|
||||
|
@ -1914,84 +2139,72 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
|
|||
stream->restart_download = FALSE;
|
||||
}
|
||||
|
||||
if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, &fragment)) {
|
||||
g_get_current_time (&start);
|
||||
g_mutex_lock (&stream->fragment_download_lock);
|
||||
if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, fragment)) {
|
||||
GST_INFO_OBJECT (stream->pad,
|
||||
"Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%"
|
||||
GST_TIME_FORMAT " Range:%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
|
||||
fragment.uri, GST_TIME_ARGS (fragment.timestamp),
|
||||
GST_TIME_ARGS (fragment.duration),
|
||||
fragment.range_start, fragment.range_end);
|
||||
fragment->uri, GST_TIME_ARGS (fragment->timestamp),
|
||||
GST_TIME_ARGS (fragment->duration),
|
||||
fragment->range_start, fragment->range_end);
|
||||
|
||||
download = gst_uri_downloader_fetch_uri_with_range (stream->downloader,
|
||||
fragment.uri, demux->client->mpd_uri, FALSE, FALSE, TRUE,
|
||||
fragment.range_start, fragment.range_end, NULL);
|
||||
if (stream->need_header) {
|
||||
/* We need to fetch a new header */
|
||||
gst_dash_demux_get_next_header (demux, stream);
|
||||
stream->need_header = FALSE;
|
||||
}
|
||||
|
||||
if (download == NULL) {
|
||||
gst_media_fragment_info_clear (&fragment);
|
||||
return NULL;
|
||||
/* it is possible to have an index per fragment, so check and download */
|
||||
if (fragment->index_uri || fragment->index_range_start
|
||||
|| fragment->index_range_end != -1) {
|
||||
const gchar *uri = fragment->index_uri;
|
||||
|
||||
if (!uri) /* fallback to default media uri */
|
||||
uri = fragment->uri;
|
||||
|
||||
GST_DEBUG_OBJECT (stream->pad,
|
||||
"Fragment index download: %s %" G_GINT64_FORMAT "-%"
|
||||
G_GINT64_FORMAT, uri, fragment->index_range_start,
|
||||
fragment->index_range_end);
|
||||
gst_dash_demux_stream_download_uri (demux, stream, uri,
|
||||
fragment->index_range_start, fragment->index_range_end);
|
||||
}
|
||||
|
||||
if (demux->cancelled)
|
||||
goto exit;
|
||||
|
||||
/* now get the real fragment */
|
||||
gst_dash_demux_stream_download_uri (demux, stream, fragment->uri,
|
||||
fragment->range_start, fragment->range_end);
|
||||
|
||||
if (stream->last_ret < GST_FLOW_EOS) {
|
||||
gst_media_fragment_info_clear (fragment);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
active_stream = stream->active_stream;
|
||||
if (active_stream == NULL) {
|
||||
gst_media_fragment_info_clear (&fragment);
|
||||
g_object_unref (download);
|
||||
return NULL;
|
||||
gst_media_fragment_info_clear (fragment);
|
||||
stream->last_ret = GST_FLOW_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
buffer = gst_fragment_get_buffer (download);
|
||||
g_object_unref (download);
|
||||
|
||||
/* it is possible to have an index per fragment, so check and download */
|
||||
if (fragment.index_uri || fragment.index_range_start
|
||||
|| fragment.index_range_end != -1) {
|
||||
const gchar *uri = fragment.index_uri;
|
||||
GstBuffer *index_buffer;
|
||||
|
||||
if (!uri) /* fallback to default media uri */
|
||||
uri = fragment.uri;
|
||||
|
||||
GST_DEBUG_OBJECT (stream->pad,
|
||||
"Fragment index download: %s %" G_GINT64_FORMAT "-%"
|
||||
G_GINT64_FORMAT, uri, fragment.index_range_start,
|
||||
fragment.index_range_end);
|
||||
download =
|
||||
gst_uri_downloader_fetch_uri_with_range (stream->downloader, uri,
|
||||
demux->client->mpd_uri, FALSE, FALSE, TRUE,
|
||||
fragment.index_range_start, fragment.index_range_end, NULL);
|
||||
if (download) {
|
||||
index_buffer = gst_fragment_get_buffer (download);
|
||||
if (index_buffer)
|
||||
buffer = gst_buffer_append (index_buffer, buffer);
|
||||
g_object_unref (download);
|
||||
}
|
||||
if (stream->last_ret == GST_FLOW_OK) {
|
||||
stream->position += fragment->duration;
|
||||
}
|
||||
|
||||
if (stream->need_header) {
|
||||
/* We need to fetch a new header */
|
||||
if ((header_buffer =
|
||||
gst_dash_demux_get_next_header (demux, stream)) != NULL) {
|
||||
buffer = gst_buffer_append (header_buffer, buffer);
|
||||
}
|
||||
stream->need_header = FALSE;
|
||||
}
|
||||
g_get_current_time (&now);
|
||||
*download_time = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (start));
|
||||
|
||||
buffer = gst_buffer_make_writable (buffer);
|
||||
|
||||
GST_BUFFER_TIMESTAMP (buffer) = fragment.timestamp;
|
||||
GST_BUFFER_DURATION (buffer) = fragment.duration;
|
||||
GST_BUFFER_OFFSET (buffer) =
|
||||
gst_mpd_client_get_segment_index (active_stream) - 1;
|
||||
|
||||
gst_media_fragment_info_clear (&fragment);
|
||||
*size_buffer += gst_buffer_get_size (buffer);
|
||||
gst_media_fragment_info_clear (fragment);
|
||||
} else {
|
||||
GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d",
|
||||
stream, stream->index);
|
||||
}
|
||||
return buffer;
|
||||
|
||||
|
||||
exit:
|
||||
g_mutex_unlock (&stream->fragment_download_lock);
|
||||
|
||||
/* TODO a race can set it to READY after it was set to NULL */
|
||||
gst_element_set_state (stream->src, GST_STATE_READY);
|
||||
}
|
||||
|
||||
/* gst_dash_demux_stream_get_next_fragment:
|
||||
|
@ -2008,9 +2221,6 @@ static GstFlowReturn
|
|||
gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream,
|
||||
GstClockTime * ts)
|
||||
{
|
||||
guint64 buffer_size = 0;
|
||||
GstClockTime diff;
|
||||
GstBuffer *buffer = NULL;
|
||||
GstFlowReturn ret = GST_FLOW_OK;
|
||||
GstDashDemux *demux = stream->demux;
|
||||
|
||||
|
@ -2048,19 +2258,15 @@ gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream,
|
|||
}
|
||||
|
||||
/* Get the fragment corresponding to each stream index */
|
||||
buffer =
|
||||
gst_dash_demux_stream_download_fragment (demux, stream,
|
||||
&buffer_size, &diff);
|
||||
gst_dash_demux_stream_download_fragment (demux, stream);
|
||||
|
||||
demux->end_of_period = FALSE;
|
||||
|
||||
if (buffer) {
|
||||
ret = gst_dash_demux_push (stream, buffer);
|
||||
} else {
|
||||
if (stream->last_ret < GST_FLOW_EOS) {
|
||||
GST_WARNING_OBJECT (stream->pad, "Failed to download fragment");
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (buffer_size > 0 && diff > 0) {
|
||||
#ifndef GST_DISABLE_GST_DEBUG
|
||||
guint64 brate;
|
||||
|
@ -2076,6 +2282,7 @@ gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream,
|
|||
G_GUINT64_FORMAT " Ko in %.2f s)", stream->index, brate / 1000,
|
||||
buffer_size / 1024, ((double) diff / GST_SECOND));
|
||||
}
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -81,6 +81,17 @@ struct _GstDashDemuxStream
|
|||
GstUriDownloader *downloader;
|
||||
|
||||
GstDownloadRate dnl_rate;
|
||||
|
||||
/* download tooling */
|
||||
GstElement *src;
|
||||
GstPad *src_srcpad;
|
||||
GMutex fragment_download_lock;
|
||||
GCond fragment_download_cond;
|
||||
GstMediaFragmentInfo current_fragment;
|
||||
gboolean starting_fragment;
|
||||
gint64 download_start_time;
|
||||
gint64 download_total_time;
|
||||
gint64 download_total_bytes;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -90,7 +101,7 @@ struct _GstDashDemuxStream
|
|||
*/
|
||||
struct _GstDashDemux
|
||||
{
|
||||
GstElement parent;
|
||||
GstBin parent;
|
||||
GstPad *sinkpad;
|
||||
|
||||
gboolean have_group_id;
|
||||
|
@ -123,7 +134,7 @@ struct _GstDashDemux
|
|||
|
||||
struct _GstDashDemuxClass
|
||||
{
|
||||
GstElementClass parent_class;
|
||||
GstBinClass parent_class;
|
||||
};
|
||||
|
||||
GType gst_dash_demux_get_type (void);
|
||||
|
|
Loading…
Reference in a new issue