adaptivedemux: refactor chunk downloading flow

Add more power to the chunk_received function (renamed to data_received)
and also to the fragment_finish function.

The data_received function must parse/decrypt the data if necessary and
also push it using the new push_buffer function that is exposed now. The
default implementation gets data from the stream adapter (all available)
and pushes it.

The fragment_finish function must also advance the fragment. The default
implementation only advances the fragment.

This allows the subsegment handling in dashdemux to continuously download
the same file from the server instead of stopping at every subsegment
boundary and starting a new request
This commit is contained in:
Thiago Santos 2015-01-15 17:44:45 -03:00
parent 15d51c1f6c
commit 229a15b393
6 changed files with 308 additions and 294 deletions

View file

@ -200,7 +200,7 @@ static GstFlowReturn gst_dash_demux_stream_seek (GstAdaptiveDemuxStream *
stream, GstClockTime ts);
static GstFlowReturn
gst_dash_demux_stream_advance_fragment (GstAdaptiveDemuxStream * stream);
static void
static gboolean
gst_dash_demux_stream_advance_subfragment (GstAdaptiveDemuxStream * stream);
static gboolean gst_dash_demux_stream_select_bitrate (GstAdaptiveDemuxStream *
stream, guint64 bitrate);
@ -213,8 +213,11 @@ gst_dash_demux_stream_get_fragment_waiting_time (GstAdaptiveDemuxStream *
stream);
static void gst_dash_demux_advance_period (GstAdaptiveDemux * demux);
static gboolean gst_dash_demux_has_next_period (GstAdaptiveDemux * demux);
static GstFlowReturn gst_dash_demux_chunk_received (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstBuffer ** chunk);
static GstFlowReturn gst_dash_demux_data_received (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream);
static GstFlowReturn
gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream);
/* GstDashDemux */
static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux);
@ -600,7 +603,8 @@ gst_dash_demux_process_manifest (GstAdaptiveDemux * demux, GstBuffer * buf)
if (gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client)) {
klass = GST_ADAPTIVE_DEMUX_GET_CLASS (dashdemux);
klass->chunk_received = gst_dash_demux_chunk_received;
klass->data_received = gst_dash_demux_data_received;
klass->finish_fragment = gst_dash_demux_stream_fragment_finished;
}
if (gst_mpd_client_setup_media_presentation (dashdemux->client)) {
@ -882,7 +886,7 @@ gst_dash_demux_stream_seek (GstAdaptiveDemuxStream * stream, GstClockTime ts)
return GST_FLOW_OK;
}
static void
static gboolean
gst_dash_demux_stream_advance_subfragment (GstAdaptiveDemuxStream * stream)
{
GstDashDemuxStream *dashstream = (GstDashDemuxStream *) stream;
@ -905,6 +909,7 @@ gst_dash_demux_stream_advance_subfragment (GstAdaptiveDemuxStream * stream)
if (!fragment_finished) {
dashstream->sidx_current_remaining = sidx->entries[sidx->entry_index].size;
}
return !fragment_finished;
}
static GstFlowReturn
@ -914,17 +919,8 @@ gst_dash_demux_stream_advance_fragment (GstAdaptiveDemuxStream * stream)
GstDashDemux *dashdemux = GST_DASH_DEMUX_CAST (stream->demux);
if (gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client)) {
GstSidxBox *sidx = SIDX (dashstream);
if (stream->demux->segment.rate > 0.0) {
if (sidx->entry_index < sidx->entries_count) {
return GST_FLOW_OK;
}
} else {
if (sidx->entry_index >= 0) {
return GST_FLOW_OK;
}
}
if (gst_dash_demux_stream_advance_subfragment (stream))
return GST_FLOW_OK;
}
return gst_mpd_client_advance_segment (dashdemux->client,
@ -986,22 +982,14 @@ gst_dash_demux_stream_select_bitrate (GstAdaptiveDemuxStream * stream,
if (gst_mpd_client_has_isoff_ondemand_profile (demux->client)) {
/* a new subsegment is going to start, cleanup any pending data from the
* previous one */
/* store our current position to change to the same one in a different
* representation if needed */
dashstream->sidx_index = SIDX (dashstream)->entry_index;
if (dashstream->pending_buffer) {
gst_buffer_unref (dashstream->pending_buffer);
dashstream->pending_buffer = NULL;
}
if (ret) {
/* TODO cache indexes to avoid re-downloading and parsing */
/* if we switched, we need a new index */
gst_isoff_sidx_parser_clear (&dashstream->sidx_parser);
gst_isoff_sidx_parser_init (&dashstream->sidx_parser);
} else {
dashstream->sidx_current_remaining =
SIDX_ENTRY (dashstream, dashstream->sidx_index)->size;
}
}
@ -1071,11 +1059,6 @@ gst_dash_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek)
gst_isoff_sidx_parser_clear (&dashstream->sidx_parser);
gst_isoff_sidx_parser_init (&dashstream->sidx_parser);
}
if (dashstream->pending_buffer) {
gst_buffer_unref (dashstream->pending_buffer);
dashstream->pending_buffer = NULL;
}
gst_dash_demux_stream_seek (iter->data, target_pos);
}
return TRUE;
@ -1247,88 +1230,112 @@ gst_dash_demux_advance_period (GstAdaptiveDemux * demux)
}
static GstBuffer *
_gst_buffer_split (GstBuffer ** buffer, gint offset, gsize size)
_gst_buffer_split (GstBuffer * buffer, gint offset, gsize size)
{
GstBuffer *newbuf = gst_buffer_copy_region (*buffer,
GstBuffer *newbuf = gst_buffer_copy_region (buffer,
GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS | GST_BUFFER_COPY_META
| GST_BUFFER_COPY_MEMORY, offset, size - offset);
gst_buffer_resize (*buffer, 0, offset);
gst_buffer_resize (buffer, 0, offset);
return newbuf;
}
static GstFlowReturn
gst_dash_demux_chunk_received (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstBuffer ** chunk)
gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream)
{
GstDashDemux *dashdemux = GST_DASH_DEMUX_CAST (demux);
if (gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client)) {
/* fragment is advanced on data_received when byte limits are reached */
return GST_FLOW_OK;
} else {
return gst_adaptive_demux_stream_advance_fragment (demux, stream,
stream->fragment.duration);
}
}
static GstFlowReturn
gst_dash_demux_data_received (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream)
{
GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream;
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buffer;
gsize available;
if (*chunk == NULL) {
if (dash_stream->pending_buffer) {
*chunk = dash_stream->pending_buffer;
dash_stream->pending_buffer = NULL;
}
return GST_FLOW_OK;
}
if (dash_stream->pending_buffer) {
*chunk = gst_buffer_append (dash_stream->pending_buffer, *chunk);
dash_stream->pending_buffer = NULL;
}
if (stream->downloading_index
&& dash_stream->sidx_parser.status != GST_ISOFF_SIDX_PARSER_FINISHED) {
if (stream->downloading_index) {
GstIsoffParserResult res;
guint consumed;
res =
gst_isoff_sidx_parser_add_buffer (&dash_stream->sidx_parser, *chunk,
&consumed);
available = gst_adapter_available (stream->adapter);
buffer = gst_adapter_take_buffer (stream->adapter, available);
if (res == GST_ISOFF_PARSER_ERROR) {
} else if (res == GST_ISOFF_PARSER_UNEXPECTED) {
/* this is not a 'sidx' index, just skip it and continue playback */
} else {
/* when finished, prepare for real data streaming */
if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) {
if (GST_CLOCK_TIME_IS_VALID (dash_stream->pending_seek_ts)) {
gst_dash_demux_stream_sidx_seek (dash_stream,
dash_stream->pending_seek_ts);
dash_stream->pending_seek_ts = GST_CLOCK_TIME_NONE;
} else {
SIDX (dash_stream)->entry_index = dash_stream->sidx_index;
if (dash_stream->sidx_parser.status != GST_ISOFF_SIDX_PARSER_FINISHED) {
res =
gst_isoff_sidx_parser_add_buffer (&dash_stream->sidx_parser, buffer,
&consumed);
if (res == GST_ISOFF_PARSER_ERROR) {
} else if (res == GST_ISOFF_PARSER_UNEXPECTED) {
/* this is not a 'sidx' index, just skip it and continue playback */
} else {
/* when finished, prepare for real data streaming */
if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) {
if (GST_CLOCK_TIME_IS_VALID (dash_stream->pending_seek_ts)) {
gst_dash_demux_stream_sidx_seek (dash_stream,
dash_stream->pending_seek_ts);
dash_stream->pending_seek_ts = GST_CLOCK_TIME_NONE;
} else {
SIDX (dash_stream)->entry_index = dash_stream->sidx_index;
}
dash_stream->sidx_current_remaining =
SIDX_CURRENT_ENTRY (dash_stream)->size;
} else if (consumed < available) {
GstBuffer *pending;
/* we still need to keep some data around for the next parsing round
* so just push what was already processed by the parser */
pending = _gst_buffer_split (buffer, consumed, -1);
gst_adapter_push (stream->adapter, pending);
}
dash_stream->sidx_current_remaining =
SIDX_CURRENT_ENTRY (dash_stream)->size;
} else if (consumed < gst_buffer_get_size (*chunk)) {
dash_stream->pending_buffer = _gst_buffer_split (chunk, consumed, -1);
}
}
}
ret = gst_adaptive_demux_stream_push_buffer (stream, buffer);
} else if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) {
/* check our position in subsegments */
if (!stream->downloading_index
&& dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) {
gsize size = gst_buffer_get_size (*chunk);
while (ret == GST_FLOW_OK
&& ((available = gst_adapter_available (stream->adapter)) > 0)) {
gboolean advance = FALSE;
GST_LOG_OBJECT (stream->pad,
"Received buffer of size: %" G_GSIZE_FORMAT
" - remaining in subsegment: %" G_GSIZE_FORMAT, size,
dash_stream->sidx_current_remaining);
if (size < dash_stream->sidx_current_remaining) {
dash_stream->sidx_current_remaining -= size;
} else if (size >= dash_stream->sidx_current_remaining) {
if (size > dash_stream->sidx_current_remaining) {
dash_stream->pending_buffer =
_gst_buffer_split (chunk, dash_stream->sidx_current_remaining,
size);
if (available < dash_stream->sidx_current_remaining) {
buffer = gst_adapter_take_buffer (stream->adapter, available);
dash_stream->sidx_current_remaining -= available;
} else {
buffer =
gst_adapter_take_buffer (stream->adapter,
dash_stream->sidx_current_remaining);
dash_stream->sidx_current_remaining = 0;
advance = TRUE;
}
ret = gst_adaptive_demux_stream_push_buffer (stream, buffer);
if (advance) {
GstFlowReturn new_ret;
new_ret =
gst_adaptive_demux_stream_advance_fragment (demux, stream,
SIDX_CURRENT_ENTRY (dash_stream)->duration);
gst_dash_demux_stream_advance_subfragment (stream);
ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SUBSEGMENT_END;
/* only overwrite if it was OK before */
if (ret == GST_FLOW_OK)
ret = new_ret;
}
}
} else {
/* this should be the main header, just push it all */
ret =
gst_adaptive_demux_stream_push_buffer (stream,
gst_adapter_take_buffer (stream->adapter,
gst_adapter_available (stream->adapter)));
}
return ret;
@ -1340,6 +1347,4 @@ gst_dash_demux_stream_free (GstAdaptiveDemuxStream * stream)
GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream;
gst_isoff_sidx_parser_clear (&dash_stream->sidx_parser);
if (dash_stream->pending_buffer)
gst_buffer_unref (dash_stream->pending_buffer);
}

View file

@ -65,8 +65,6 @@ struct _GstDashDemuxStream
GstMediaFragmentInfo current_fragment;
GstBuffer *pending_buffer;
/* index parsing */
GstSidxParser sidx_parser;
gsize sidx_current_remaining;

View file

@ -111,10 +111,10 @@ static gboolean gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek);
static gboolean
gst_hls_demux_start_fragment (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream);
static void gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstBuffer ** buffer);
static GstFlowReturn gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstBuffer ** chunk);
static GstFlowReturn gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream);
static GstFlowReturn gst_hls_demux_data_received (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream);
static gboolean gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream *
stream);
static GstFlowReturn gst_hls_demux_advance_fragment (GstAdaptiveDemuxStream *
@ -216,7 +216,7 @@ gst_hls_demux_class_init (GstHLSDemuxClass * klass)
adaptivedemux_class->start_fragment = gst_hls_demux_start_fragment;
adaptivedemux_class->finish_fragment = gst_hls_demux_finish_fragment;
adaptivedemux_class->chunk_received = gst_hls_demux_chunk_received;
adaptivedemux_class->data_received = gst_hls_demux_data_received;
GST_DEBUG_CATEGORY_INIT (gst_hls_demux_debug, "hlsdemux", 0,
"hlsdemux element");
@ -289,9 +289,6 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition)
gst_hls_demux_reset (GST_ADAPTIVE_DEMUX_CAST (demux));
gst_uri_downloader_reset (demux->downloader);
break;
case GST_STATE_CHANGE_NULL_TO_READY:
demux->adapter = gst_adapter_new ();
break;
default:
break;
}
@ -302,10 +299,6 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_hls_demux_reset (GST_ADAPTIVE_DEMUX_CAST (demux));
break;
case GST_STATE_CHANGE_READY_TO_NULL:
gst_object_unref (demux->adapter);
demux->adapter = NULL;
break;
default:
break;
}
@ -364,11 +357,6 @@ gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek)
/* properly cleanup pending decryption status */
if (flags & GST_SEEK_FLAG_FLUSH) {
if (hlsdemux->adapter)
gst_adapter_clear (hlsdemux->adapter);
if (hlsdemux->pending_buffer)
gst_buffer_unref (hlsdemux->pending_buffer);
hlsdemux->pending_buffer = NULL;
gst_hls_demux_decrypt_end (hlsdemux);
}
@ -593,9 +581,9 @@ key_failed:
return FALSE;
}
static void
static GstFlowReturn
gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstBuffer ** buffer)
GstAdaptiveDemuxStream * stream)
{
GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux);
@ -605,8 +593,8 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
/* ideally this should be empty, but this eos might have been
* caused by an error on the source element */
GST_DEBUG_OBJECT (demux, "Data still on the adapter when EOS was received"
": %" G_GSIZE_FORMAT, gst_adapter_available (hlsdemux->adapter));
gst_adapter_clear (hlsdemux->adapter);
": %" G_GSIZE_FORMAT, gst_adapter_available (stream->adapter));
gst_adapter_clear (stream->adapter);
/* pending buffer is only used for encrypted streams */
if (stream->last_ret == GST_FLOW_OK) {
@ -621,40 +609,41 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
gst_buffer_resize (hlsdemux->pending_buffer, 0, unpadded_size);
*buffer = hlsdemux->pending_buffer;
hlsdemux->pending_buffer = NULL;
gst_adaptive_demux_stream_push_buffer (stream, hlsdemux->pending_buffer);
}
} else {
if (hlsdemux->pending_buffer)
gst_buffer_unref (hlsdemux->pending_buffer);
hlsdemux->pending_buffer = NULL;
}
return gst_adaptive_demux_stream_advance_fragment (demux, stream,
stream->fragment.duration);
}
static GstFlowReturn
gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstBuffer ** chunk)
gst_hls_demux_data_received (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream)
{
GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux);
GstBuffer *buffer = *chunk;
gsize available;
GstBuffer *buffer = NULL;
available = gst_adapter_available (stream->adapter);
/* Is it encrypted? */
if (hlsdemux->current_key) {
GError *err = NULL;
GstBuffer *tmp_buffer;
gsize available;
gst_adapter_push (hlsdemux->adapter, buffer);
*chunk = NULL;
/* must be a multiple of 16 */
available = gst_adapter_available (hlsdemux->adapter) & (~0xF);
available = available & (~0xF);
if (available == 0) {
return GST_FLOW_OK;
}
buffer = gst_adapter_take_buffer (hlsdemux->adapter, available);
buffer = gst_adapter_take_buffer (stream->adapter, available);
buffer = gst_hls_demux_decrypt_fragment (hlsdemux, buffer, &err);
if (buffer == NULL) {
GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Failed to decrypt buffer"),
@ -665,14 +654,15 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
tmp_buffer = hlsdemux->pending_buffer;
hlsdemux->pending_buffer = buffer;
*chunk = tmp_buffer;
} else if (hlsdemux->pending_buffer) {
*chunk = gst_buffer_append (hlsdemux->pending_buffer, buffer);
hlsdemux->pending_buffer = NULL;
buffer = tmp_buffer;
} else {
buffer = gst_adapter_take_buffer (stream->adapter, available);
if (hlsdemux->pending_buffer) {
buffer = gst_buffer_append (hlsdemux->pending_buffer, buffer);
hlsdemux->pending_buffer = NULL;
}
}
buffer = *chunk;
if (G_UNLIKELY (hlsdemux->do_typefind && buffer != NULL)) {
GstCaps *caps = NULL;
GstMapInfo info;
@ -697,6 +687,7 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
if (buffer_size > (2 * 1024 * 1024)) {
GST_ELEMENT_ERROR (hlsdemux, STREAM, TYPE_NOT_FOUND,
("Could not determine type of stream"), (NULL));
gst_buffer_unref (buffer);
return GST_FLOW_NOT_NEGOTIATED;
} else {
if (hlsdemux->pending_buffer)
@ -704,7 +695,6 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
gst_buffer_append (buffer, hlsdemux->pending_buffer);
else
hlsdemux->pending_buffer = buffer;
*chunk = NULL;
return GST_FLOW_OK;
}
}
@ -722,6 +712,9 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux,
hlsdemux->do_typefind = FALSE;
}
if (buffer) {
return gst_adaptive_demux_stream_push_buffer (stream, buffer);
}
return GST_FLOW_OK;
}
@ -842,8 +835,6 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux)
demux->client = gst_m3u8_client_new ("", NULL);
demux->srcpad_counter = 0;
if (demux->adapter)
gst_adapter_clear (demux->adapter);
if (demux->pending_buffer)
gst_buffer_unref (demux->pending_buffer);
demux->pending_buffer = NULL;

View file

@ -25,7 +25,6 @@
#define __GST_HLS_DEMUX_H__
#include <gst/gst.h>
#include <gst/base/gstadapter.h>
#include "m3u8.h"
#include "gstfragmented.h"
#include <gst/uridownloader/gsturidownloader.h>
@ -96,7 +95,6 @@ struct _GstHLSDemux
#endif
gchar *current_key;
guint8 *current_iv;
GstAdapter *adapter; /* used to accumulate 16 bytes multiple chunks */
GstBuffer *pending_buffer; /* decryption scenario:
* the last buffer can only be pushed when
* resized, so need to store and wait for

View file

@ -87,6 +87,11 @@ GST_DEBUG_CATEGORY (adaptivedemux_debug);
#define MAX_DOWNLOAD_ERROR_COUNT 3
#define DEFAULT_FAILED_COUNT 3
enum GstAdaptiveDemuxFlowReturn
{
GST_ADAPTIVE_DEMUX_FLOW_SWITCH = GST_FLOW_CUSTOM_SUCCESS_2 + 1
};
struct _GstAdaptiveDemuxPrivate
{
GstAdapter *input_adapter;
@ -141,9 +146,6 @@ static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstClockTime ts);
static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
demux, GstAdaptiveDemuxStream * stream);
static GstFlowReturn
gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream);
static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
static GstFlowReturn
@ -171,6 +173,12 @@ static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
static void
gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
stream, GstFlowReturn ret, GError * err);
static GstFlowReturn
gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream);
static GstFlowReturn
gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream);
/* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
@ -223,6 +231,9 @@ gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
gstelement_class->change_state = gst_adaptive_demux_change_state;
gstbin_class->handle_message = gst_adaptive_demux_handle_message;
klass->data_received = gst_adaptive_demux_stream_data_received_default;
klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
}
static void
@ -251,7 +262,6 @@ gst_adaptive_demux_init (GstAdaptiveDemux * demux,
g_cond_init (&demux->priv->updates_timed_cond);
g_cond_init (&demux->manifest_cond);
g_mutex_init (&demux->manifest_lock);
pad_template =
@ -299,8 +309,6 @@ gst_adaptive_demux_change_state (GstElement * element,
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_adaptive_demux_reset (demux);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
default:
break;
}
@ -704,6 +712,7 @@ gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
gst_segment_init (&stream->segment, GST_FORMAT_TIME);
g_cond_init (&stream->fragment_download_cond);
g_mutex_init (&stream->fragment_download_lock);
stream->adapter = gst_adapter_new ();
demux->next_streams = g_list_append (demux->next_streams, stream);
@ -763,6 +772,9 @@ gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
}
if (stream->pending_caps)
gst_caps_unref (stream->pending_caps);
g_object_unref (stream->adapter);
g_free (stream);
}
@ -1110,6 +1122,7 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux)
gst_task_join (stream->download_task);
stream->download_error_count = 0;
stream->need_header = TRUE;
gst_adapter_clear (stream->adapter);
}
gst_task_join (demux->priv->updates_task);
}
@ -1167,6 +1180,9 @@ gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemuxStream *
if (bitrate > G_MAXINT)
bitrate = G_MAXINT;
stream->current_download_rate = bitrate;
GST_DEBUG_OBJECT (stream->pad, "Bitrate: %" G_GUINT64_FORMAT
" (bytes: %" G_GINT64_FORMAT " / %" G_GINT64_FORMAT " microsecs",
bitrate, stream->download_total_bytes, stream->download_total_time);
return bitrate;
}
@ -1202,54 +1218,13 @@ gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
return GST_FLOW_OK;
}
static GstFlowReturn
_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GstFlowReturn
gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
GstBuffer * buffer)
{
GstPad *srcpad = (GstPad *) parent;
GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (srcpad);
GstAdaptiveDemux *demux = stream->demux;
GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
GstFlowReturn ret = GST_FLOW_OK;
gboolean discont = FALSE;
gboolean subsegment_end = FALSE;
if (stream->starting_fragment) {
stream->starting_fragment = FALSE;
if (klass->start_fragment) {
klass->start_fragment (demux, stream);
}
GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
if (GST_BUFFER_PTS_IS_VALID (buffer))
stream->segment.position = GST_BUFFER_PTS (buffer);
} else {
GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
}
/* The subclass might need to decrypt or modify the buffer somehow
* before processing it */
if (klass->chunk_received) {
ret = klass->chunk_received (demux, stream, &buffer);
if (ret != GST_FLOW_OK) {
if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SUBSEGMENT_END) {
ret = GST_FLOW_OK;
subsegment_end = TRUE;
} else {
if (buffer)
gst_buffer_unref (buffer);
gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
return ret;
}
}
}
if (buffer == NULL)
return ret;
if (stream->first_fragment_buffer) {
if (demux->segment.rate < 0)
@ -1280,15 +1255,6 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
/* TODO what about live? how to count segments?
GST_BUFFER_OFFSET (buffer) =
gst_mpd_client_get_segment_index (stream->active_stream) - 1;
*/
/* accumulate time and size to get this chunk */
stream->download_total_time +=
g_get_monotonic_time () - stream->download_start_time;
stream->download_total_bytes += gst_buffer_get_size (buffer);
if (G_UNLIKELY (stream->pending_caps)) {
GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
@ -1310,9 +1276,69 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
stream->pending_tags = NULL;
}
ret = gst_proxy_pad_chain_default (pad, parent, buffer);
stream->download_start_time = g_get_monotonic_time ();
GST_LOG_OBJECT (pad, "Chain res: %d %s", ret, gst_flow_get_name (ret));
ret = gst_pad_push (stream->pad, buffer);
GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
gst_flow_get_name (ret));
return ret;
}
static GstFlowReturn
gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream)
{
return gst_adaptive_demux_stream_advance_fragment (demux, stream,
stream->fragment.duration);
}
static GstFlowReturn
gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream)
{
GstBuffer *buffer;
buffer = gst_adapter_take_buffer (stream->adapter,
gst_adapter_available (stream->adapter));
return gst_adaptive_demux_stream_push_buffer (stream, buffer);
}
static GstFlowReturn
_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstPad *srcpad = (GstPad *) parent;
GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (srcpad);
GstAdaptiveDemux *demux = stream->demux;
GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
GstFlowReturn ret = GST_FLOW_OK;
if (stream->starting_fragment) {
stream->starting_fragment = FALSE;
if (klass->start_fragment) {
klass->start_fragment (demux, stream);
}
GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
if (GST_BUFFER_PTS_IS_VALID (buffer))
stream->segment.position = GST_BUFFER_PTS (buffer);
} else {
GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
}
stream->download_total_time +=
g_get_monotonic_time () - stream->download_chunk_start_time;
stream->download_total_bytes += gst_buffer_get_size (buffer);
gst_adapter_push (stream->adapter, buffer);
GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT
". Now %" G_GSIZE_FORMAT " on adapter", gst_buffer_get_size (buffer),
gst_adapter_available (stream->adapter));
ret = klass->data_received (demux, stream);
stream->download_chunk_start_time = g_get_monotonic_time ();
if (ret != GST_FLOW_OK) {
if (ret < GST_FLOW_EOS) {
@ -1327,34 +1353,13 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
}
gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
} else if (subsegment_end) {
/* tell upstream that we are done here */
ret = GST_FLOW_EOS;
if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH)
ret = GST_FLOW_EOS; /* return EOS to make the source stop */
}
return ret;
}
static GstFlowReturn
gst_adaptive_demux_stream_fragment_eos (GstAdaptiveDemuxStream * stream)
{
GstAdaptiveDemux *demux = stream->demux;
GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
GstBuffer *buffer = NULL;
GstFlowReturn ret = GST_FLOW_OK;
if (klass->finish_fragment) {
klass->finish_fragment (demux, stream, &buffer);
if (buffer) {
stream->download_total_time +=
g_get_monotonic_time () - stream->download_start_time;
stream->download_total_bytes += gst_buffer_get_size (buffer);
ret = gst_pad_push (stream->pad, buffer);
}
}
return ret;
}
static void
gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
stream, GstFlowReturn ret, GError * err)
@ -1386,9 +1391,11 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:{
GstAdaptiveDemuxClass *klass;
GstFlowReturn ret;
ret = gst_adaptive_demux_stream_fragment_eos (stream);
klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
ret = klass->finish_fragment (stream->demux, stream);
gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
break;
}
@ -1629,6 +1636,7 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
g_mutex_lock (&stream->fragment_download_lock);
if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
stream->download_start_time = g_get_monotonic_time ();
stream->download_chunk_start_time = g_get_monotonic_time ();
g_mutex_unlock (&stream->fragment_download_lock);
gst_element_sync_state_with_parent (stream->src);
g_mutex_lock (&stream->fragment_download_lock);
@ -1734,20 +1742,7 @@ gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
stream->fragment.range_start, stream->fragment.range_end);
GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d %s",
stream->last_ret, gst_flow_get_name (stream->last_ret));
if (ret == GST_FLOW_OK) {
gst_element_post_message (GST_ELEMENT_CAST (demux),
gst_message_new_element (GST_OBJECT_CAST (demux),
gst_structure_new (STATISTICS_MESSAGE_NAME,
"manifest-uri", G_TYPE_STRING,
demux->manifest_uri, "uri", G_TYPE_STRING,
url, "fragment-start-time",
GST_TYPE_CLOCK_TIME, stream->download_start_time,
"fragment-stop-time", GST_TYPE_CLOCK_TIME,
gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
stream->download_total_bytes, "fragment-download-time",
GST_TYPE_CLOCK_TIME,
stream->download_total_time * GST_USECOND, NULL)));
} else {
if (ret != GST_FLOW_OK) {
GST_INFO_OBJECT (demux, "No fragment downloaded");
/* TODO check if we are truly stoping */
if (ret != GST_FLOW_ERROR && gst_adaptive_demux_is_live (demux)) {
@ -1758,8 +1753,6 @@ gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
}
}
return ret;
no_url_error:
@ -1932,44 +1925,6 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
GST_MANIFEST_LOCK (demux);
}
if (ret == GST_FLOW_OK) {
stream->download_error_count = 0;
g_clear_error (&stream->last_error);
if (GST_CLOCK_TIME_IS_VALID (stream->fragment.duration))
stream->segment.position += stream->fragment.duration;
GST_DEBUG_OBJECT (stream->pad, "Advancing to next fragment");
if (gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
ret = gst_adaptive_demux_stream_advance_fragment (demux, stream);
GST_DEBUG_OBJECT (stream->pad, "Advanced. Result: %d %s", ret,
gst_flow_get_name (ret));
if (ret == GST_FLOW_OK) {
if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
gst_adaptive_demux_stream_update_current_bitrate (stream))) {
stream->need_header = TRUE;
}
/* the subclass might want to switch pads */
if (G_UNLIKELY (demux->next_streams)) {
gst_task_stop (stream->download_task);
/* TODO only allow switching streams if other downloads are not ongoing */
GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
"to do bitrate switching");
gst_adaptive_demux_expose_streams (demux, FALSE);
gst_adaptive_demux_start_tasks (demux);
ret = GST_FLOW_EOS;
GST_MANIFEST_UNLOCK (demux);
goto end_of_manifest;
}
}
} else {
GST_DEBUG_OBJECT (stream->pad, "No next fragment -> EOS");
ret = GST_FLOW_EOS;
}
}
stream->last_ret = ret;
switch (ret) {
case GST_FLOW_OK:
break; /* all is good, let's go */
@ -2239,15 +2194,80 @@ gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
return ret;
}
static GstFlowReturn
GstFlowReturn
gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream)
GstAdaptiveDemuxStream * stream, GstClockTime duration)
{
GstFlowReturn ret;
GST_MANIFEST_LOCK (demux);
if (stream->last_ret == GST_FLOW_OK) {
stream->last_ret =
gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
duration);
}
ret = stream->last_ret;
GST_MANIFEST_UNLOCK (demux);
return ret;
}
GstFlowReturn
gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstClockTime duration)
{
GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
GstFlowReturn ret;
g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
return klass->stream_advance_fragment (stream);
stream->download_error_count = 0;
g_clear_error (&stream->last_error);
stream->download_total_time +=
g_get_monotonic_time () - stream->download_chunk_start_time;
/* FIXME - url has no indication of byte ranges for subsegments */
gst_element_post_message (GST_ELEMENT_CAST (demux),
gst_message_new_element (GST_OBJECT_CAST (demux),
gst_structure_new (STATISTICS_MESSAGE_NAME,
"manifest-uri", G_TYPE_STRING,
demux->manifest_uri, "uri", G_TYPE_STRING,
stream->fragment.uri, "fragment-start-time",
GST_TYPE_CLOCK_TIME, stream->download_start_time,
"fragment-stop-time", GST_TYPE_CLOCK_TIME,
gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
stream->download_total_bytes, "fragment-download-time",
GST_TYPE_CLOCK_TIME,
stream->download_total_time * GST_USECOND, NULL)));
if (GST_CLOCK_TIME_IS_VALID (duration))
stream->segment.position += duration;
ret = klass->stream_advance_fragment (stream);
stream->download_start_time = stream->download_chunk_start_time =
g_get_monotonic_time ();
if (ret == GST_FLOW_OK) {
if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
gst_adaptive_demux_stream_update_current_bitrate (stream))) {
stream->need_header = TRUE;
gst_adapter_clear (stream->adapter);
ret = GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
}
/* the subclass might want to switch pads */
if (G_UNLIKELY (demux->next_streams)) {
gst_task_stop (stream->download_task);
/* TODO only allow switching streams if other downloads are not ongoing */
GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
"to do bitrate switching");
gst_adaptive_demux_expose_streams (demux, FALSE);
gst_adaptive_demux_start_tasks (demux);
ret = GST_FLOW_EOS;
}
}
return ret;
}
static gboolean

View file

@ -77,10 +77,6 @@ G_BEGIN_DECLS
g_clear_error (&err); \
} G_STMT_END
enum _GstAdaptiveDemuxFlowReturn {
GST_ADAPTIVE_DEMUX_FLOW_SUBSEGMENT_END = GST_FLOW_CUSTOM_SUCCESS + 100
};
typedef struct _GstAdaptiveDemuxStreamFragment GstAdaptiveDemuxStreamFragment;
typedef struct _GstAdaptiveDemuxStream GstAdaptiveDemuxStream;
typedef struct _GstAdaptiveDemux GstAdaptiveDemux;
@ -115,6 +111,8 @@ struct _GstAdaptiveDemuxStream
GstSegment segment;
GstAdapter *adapter;
GstCaps *pending_caps;
GstEvent *pending_segment;
GstTagList *pending_tags;
@ -141,6 +139,7 @@ struct _GstAdaptiveDemuxStream
gboolean starting_fragment;
gboolean first_fragment_buffer;
gint64 download_start_time;
gint64 download_chunk_start_time;
gint64 download_total_time;
gint64 download_total_bytes;
gint current_download_rate;
@ -335,28 +334,23 @@ struct _GstAdaptiveDemuxClass
* finish_fragment:
* @demux: #GstAdaptiveDemux
* @stream: #GstAdaptiveDemuxStream
* @buffer: Pointer to store and pending data that should be pushed.
*
* Notifies the subclass that a fragment download was finished.
* It can be used to cleanup internal state after a fragment and also
* provides a pointer for the subclass to return some pending data
* that should be pushed before starting the next fragment. This
* covers the use case of finishing the decryption of the last chunk
* of an encrypted fragment.
* It can be used to cleanup internal state after a fragment and
* also push any pending data before moving to the next fragment.
*/
void (*finish_fragment) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer ** buffer);
GstFlowReturn (*finish_fragment) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream);
/**
* chunk_received:
* data_received:
* @demux: #GstAdaptiveDemux
* @stream: #GstAdaptiveDemuxStream
* @buffer: Pointer containing the received chunk, also used to return modified data
*
* Notifies the subclass that a fragment chunk was downloaded. The subclass can
* modify the buffer and return a new one if needed. Used for decryption.
* Notifies the subclass that a fragment chunk was downloaded. The subclass
* can look at the data at the adapter and modify/push data as desired.
*
* Returns: #GST_FLOW_OK if successful, #GST_FLOW_ERROR in case of error.
*/
GstFlowReturn (*chunk_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer ** buffer);
GstFlowReturn (*data_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream);
/**
* get_live_seek_range:
@ -385,6 +379,14 @@ void gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
GstTagList * tags);
void gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f);
GstFlowReturn gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
GstFlowReturn
gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstClockTime duration);
GstFlowReturn
gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstClockTime duration);
G_END_DECLS
#endif