mssdemux: add live support

Implement the functions of adaptivedemux to have live support
enabled. This allows mssdemux to refresh the Manifest periodically
This commit is contained in:
Thiago Santos 2014-12-04 01:46:43 -03:00
parent 54a7bdc0be
commit 8c49c79d7e
3 changed files with 210 additions and 198 deletions

View file

@ -126,6 +126,8 @@ static GstClockTime gst_mss_demux_get_duration (GstAdaptiveDemux * demux);
static void gst_mss_demux_reset (GstAdaptiveDemux * demux);
static GstFlowReturn gst_mss_demux_stream_seek (GstAdaptiveDemuxStream * stream,
GstClockTime ts);
static gboolean
gst_mss_demux_stream_has_next_fragment (GstAdaptiveDemuxStream * stream);
static GstFlowReturn
gst_mss_demux_stream_advance_fragment (GstAdaptiveDemuxStream * stream);
static gboolean gst_mss_demux_stream_select_bitrate (GstAdaptiveDemuxStream *
@ -133,6 +135,10 @@ static gboolean gst_mss_demux_stream_select_bitrate (GstAdaptiveDemuxStream *
static GstFlowReturn
gst_mss_demux_stream_update_fragment_info (GstAdaptiveDemuxStream * stream);
static gboolean gst_mss_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek);
static gint64
gst_mss_demux_get_manifest_update_interval (GstAdaptiveDemux * demux);
static GstFlowReturn
gst_mss_demux_update_manifest (GstAdaptiveDemux * demux, GstBuffer * buffer);
static void
gst_mss_demux_class_init (GstMssDemuxClass * klass)
@ -185,15 +191,20 @@ gst_mss_demux_class_init (GstMssDemuxClass * klass)
gstadaptivedemux_class->process_manifest = gst_mss_demux_process_manifest;
gstadaptivedemux_class->is_live = gst_mss_demux_is_live;
gstadaptivedemux_class->get_duration = gst_mss_demux_get_duration;
gstadaptivedemux_class->get_manifest_update_interval =
gst_mss_demux_get_manifest_update_interval;
gstadaptivedemux_class->reset = gst_mss_demux_reset;
gstadaptivedemux_class->seek = gst_mss_demux_seek;
gstadaptivedemux_class->stream_seek = gst_mss_demux_stream_seek;
gstadaptivedemux_class->stream_advance_fragment =
gst_mss_demux_stream_advance_fragment;
gstadaptivedemux_class->stream_has_next_fragment =
gst_mss_demux_stream_has_next_fragment;
gstadaptivedemux_class->stream_select_bitrate =
gst_mss_demux_stream_select_bitrate;
gstadaptivedemux_class->stream_update_fragment_info =
gst_mss_demux_stream_update_fragment_info;
gstadaptivedemux_class->update_manifest = gst_mss_demux_update_manifest;
GST_DEBUG_CATEGORY_INIT (mssdemux_debug, "mssdemux", 0, "mssdemux plugin");
}
@ -447,20 +458,13 @@ gst_mss_demux_setup_streams (GstAdaptiveDemux * demux)
return TRUE;
}
static gboolean
gst_mss_demux_process_manifest (GstAdaptiveDemux * demux, GstBuffer * buf)
static void
gst_mss_demux_update_base_url (GstMssDemux * mssdemux)
{
GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (demux);
GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (mssdemux);
gchar *baseurl_end;
#if 0
if (mssdemux->base_url == NULL) {
GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
(_("Couldn't get the Manifest's URI")),
("need to get the manifest's URI from upstream elements"));
return FALSE;
}
#endif
g_free (mssdemux->base_url);
mssdemux->base_url =
g_strdup (demux->manifest_base_uri ? demux->manifest_base_uri : demux->
@ -477,6 +481,15 @@ gst_mss_demux_process_manifest (GstAdaptiveDemux * demux, GstBuffer * buf)
GST_WARNING_OBJECT (mssdemux, "Stream's URI didn't end with /manifest");
}
}
static gboolean
gst_mss_demux_process_manifest (GstAdaptiveDemux * demux, GstBuffer * buf)
{
GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (demux);
gst_mss_demux_update_base_url (mssdemux);
mssdemux->manifest = gst_mss_manifest_new (buf);
if (!mssdemux->manifest) {
GST_ELEMENT_ERROR (mssdemux, STREAM, FORMAT, ("Bad manifest file"),
@ -486,57 +499,6 @@ gst_mss_demux_process_manifest (GstAdaptiveDemux * demux, GstBuffer * buf)
return gst_mss_demux_setup_streams (demux);
}
#if 0
static void
gst_mss_demux_reload_manifest (GstMssDemux * mssdemux)
{
GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (mssdemux);
GstUriDownloader *downloader;
GstFragment *manifest_data;
GstBuffer *manifest_buffer;
gchar *baseurl_end;
downloader = gst_uri_downloader_new ();
manifest_data =
gst_uri_downloader_fetch_uri (downloader, demux->manifest_uri, NULL,
TRUE, TRUE, TRUE, NULL);
/* FIXME not really nice to unref parent's data */
g_free (demux->manifest_uri);
g_free (demux->manifest_base_uri);
demux->manifest_uri = g_strdup ((manifest_data->redirect_permanent
&& manifest_data->redirect_uri) ? manifest_data->
redirect_uri : manifest_data->uri);
demux->manifest_base_uri =
g_strdup (manifest_data->redirect_uri ? manifest_data->
redirect_uri : manifest_data->uri);
baseurl_end = g_strrstr (demux->manifest_base_uri, "/Manifest");
if (baseurl_end == NULL) {
/* second try */
baseurl_end = g_strrstr (demux->manifest_base_uri, "/manifest");
}
if (baseurl_end) {
/* set the new end of the string */
baseurl_end[0] = '\0';
} else {
GST_WARNING_OBJECT (mssdemux, "Stream's URI didn't end with /manifest");
}
manifest_buffer = gst_fragment_get_buffer (manifest_data);
g_object_unref (manifest_data);
gst_mss_manifest_reload_fragments (mssdemux->manifest, manifest_buffer);
#if 0
gst_buffer_replace (&mssdemux->manifest_buffer, manifest_buffer);
#endif
gst_buffer_unref (manifest_buffer);
g_object_unref (downloader);
}
#endif
static gboolean
gst_mss_demux_stream_select_bitrate (GstAdaptiveDemuxStream * stream,
guint64 bitrate)
@ -597,3 +559,43 @@ gst_mss_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek)
return TRUE;
}
static gboolean
gst_mss_demux_stream_has_next_fragment (GstAdaptiveDemuxStream * stream)
{
GstMssDemuxStream *mssstream = (GstMssDemuxStream *) stream;
return gst_mss_stream_has_next_fragment (mssstream->manifest_stream);
}
static gint64
gst_mss_demux_get_manifest_update_interval (GstAdaptiveDemux * demux)
{
GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (demux);
GstClockTime interval;
/* Not much information about this in the MSS spec. It seems that
* the fragments contain an UUID box that should tell the next
* fragments time and duration so one wouldn't need to fetch
* the Manifest again, but we need a fallback here. So use 2 times
* the current fragment duration */
interval = gst_mss_manifest_get_min_fragment_duration (mssdemux->manifest);
if (!GST_CLOCK_TIME_IS_VALID (interval))
interval = 2 * GST_SECOND; /* default to 2 seconds */
interval = 2 * (interval / GST_USECOND);
return interval;
}
static GstFlowReturn
gst_mss_demux_update_manifest (GstAdaptiveDemux * demux, GstBuffer * buffer)
{
GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (demux);
gst_mss_demux_update_base_url (mssdemux);
gst_mss_manifest_reload_fragments (mssdemux->manifest, buffer);
return GST_FLOW_OK;
}

View file

@ -98,6 +98,93 @@ struct _GstMssManifest
GSList *streams;
};
/* For parsing and building a fragments list */
typedef struct _GstMssFragmentListBuilder
{
GList *fragments;
GstMssStreamFragment *previous_fragment;
guint fragment_number;
guint64 fragment_time_accum;
} GstMssFragmentListBuilder;
static void
gst_mss_fragment_list_builder_init (GstMssFragmentListBuilder * builder)
{
builder->fragments = NULL;
builder->previous_fragment = NULL;
builder->fragment_time_accum = 0;
builder->fragment_number = 0;
}
static void
gst_mss_fragment_list_builder_add (GstMssFragmentListBuilder * builder,
xmlNodePtr node)
{
gchar *duration_str;
gchar *time_str;
gchar *seqnum_str;
gchar *repetition_str;
GstMssStreamFragment *fragment = g_new (GstMssStreamFragment, 1);
duration_str = (gchar *) xmlGetProp (node, (xmlChar *) MSS_PROP_DURATION);
time_str = (gchar *) xmlGetProp (node, (xmlChar *) MSS_PROP_TIME);
seqnum_str = (gchar *) xmlGetProp (node, (xmlChar *) MSS_PROP_NUMBER);
repetition_str =
(gchar *) xmlGetProp (node, (xmlChar *) MSS_PROP_REPETITIONS);
/* use the node's seq number or use the previous + 1 */
if (seqnum_str) {
fragment->number = g_ascii_strtoull (seqnum_str, NULL, 10);
xmlFree (seqnum_str);
builder->fragment_number = fragment->number;
} else {
fragment->number = builder->fragment_number;
}
builder->fragment_number = fragment->number + 1;
if (repetition_str) {
fragment->repetitions = g_ascii_strtoull (repetition_str, NULL, 10);
xmlFree (repetition_str);
} else {
fragment->repetitions = 1;
}
if (time_str) {
fragment->time = g_ascii_strtoull (time_str, NULL, 10);
xmlFree (time_str);
builder->fragment_time_accum = fragment->time;
} else {
fragment->time = builder->fragment_time_accum;
}
/* if we have a previous fragment, means we need to set its duration */
if (builder->previous_fragment)
builder->previous_fragment->duration =
(fragment->time -
builder->previous_fragment->time) /
builder->previous_fragment->repetitions;
if (duration_str) {
fragment->duration = g_ascii_strtoull (duration_str, NULL, 10);
builder->previous_fragment = NULL;
builder->fragment_time_accum += fragment->duration * fragment->repetitions;
xmlFree (duration_str);
} else {
/* store to set the duration at the next iteration */
builder->previous_fragment = fragment;
}
/* we reverse it later */
builder->fragments = g_list_prepend (builder->fragments, fragment);
GST_LOG ("Adding fragment number: %u, time: %" G_GUINT64_FORMAT
", duration: %" G_GUINT64_FORMAT ", repetitions: %u",
fragment->number, fragment->time, fragment->duration,
fragment->repetitions);
}
static GstBuffer *gst_buffer_from_hex_string (const gchar * s);
static gboolean
@ -146,9 +233,9 @@ static void
_gst_mss_stream_init (GstMssStream * stream, xmlNodePtr node)
{
xmlNodePtr iter;
GstMssStreamFragment *previous_fragment = NULL;
guint fragment_number = 0;
guint64 fragment_time_accum = 0;
GstMssFragmentListBuilder builder;
gst_mss_fragment_list_builder_init (&builder);
stream->xmlnode = node;
@ -158,67 +245,7 @@ _gst_mss_stream_init (GstMssStream * stream, xmlNodePtr node)
for (iter = node->children; iter; iter = iter->next) {
if (node_has_type (iter, MSS_NODE_STREAM_FRAGMENT)) {
gchar *duration_str;
gchar *time_str;
gchar *seqnum_str;
gchar *repetition_str;
GstMssStreamFragment *fragment = g_new (GstMssStreamFragment, 1);
duration_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_DURATION);
time_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_TIME);
seqnum_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_NUMBER);
repetition_str =
(gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_REPETITIONS);
/* use the node's seq number or use the previous + 1 */
if (seqnum_str) {
fragment->number = g_ascii_strtoull (seqnum_str, NULL, 10);
xmlFree (seqnum_str);
fragment_number = fragment->number;
} else {
fragment->number = fragment_number;
}
fragment_number = fragment->number + 1;
if (repetition_str) {
fragment->repetitions = g_ascii_strtoull (repetition_str, NULL, 10);
xmlFree (repetition_str);
} else {
fragment->repetitions = 1;
}
if (time_str) {
fragment->time = g_ascii_strtoull (time_str, NULL, 10);
xmlFree (time_str);
fragment_time_accum = fragment->time;
} else {
fragment->time = fragment_time_accum;
}
/* if we have a previous fragment, means we need to set its duration */
if (previous_fragment)
previous_fragment->duration =
(fragment->time -
previous_fragment->time) / previous_fragment->repetitions;
if (duration_str) {
fragment->duration = g_ascii_strtoull (duration_str, NULL, 10);
previous_fragment = NULL;
fragment_time_accum += fragment->duration * fragment->repetitions;
xmlFree (duration_str);
} else {
/* store to set the duration at the next iteration */
previous_fragment = fragment;
}
/* we reverse it later */
stream->fragments = g_list_prepend (stream->fragments, fragment);
GST_LOG ("Adding fragment number: %u, time: %" G_GUINT64_FORMAT
", duration: %" G_GUINT64_FORMAT ", repetitions: %u",
fragment->number, fragment->time, fragment->duration,
fragment->repetitions);
gst_mss_fragment_list_builder_add (&builder, iter);
} else if (node_has_type (iter, MSS_NODE_STREAM_QUALITY)) {
GstMssStreamQuality *quality = gst_mss_stream_quality_new (iter);
stream->qualities = g_list_prepend (stream->qualities, quality);
@ -227,7 +254,7 @@ _gst_mss_stream_init (GstMssStream * stream, xmlNodePtr node)
}
}
stream->fragments = g_list_reverse (stream->fragments);
stream->fragments = g_list_reverse (builder.fragments);
/* order them from smaller to bigger based on bitrates */
stream->qualities =
@ -797,6 +824,29 @@ gst_mss_manifest_get_gst_duration (GstMssManifest * manifest)
return gstdur;
}
GstClockTime
gst_mss_manifest_get_min_fragment_duration (GstMssManifest * manifest)
{
GSList *iter;
GstClockTime dur = GST_CLOCK_TIME_NONE;
GstClockTime iter_dur;
for (iter = manifest->streams; iter; iter = g_slist_next (iter)) {
GstMssStream *stream = iter->data;
iter_dur = gst_mss_stream_get_fragment_gst_duration (stream);
if (iter_dur != GST_CLOCK_TIME_NONE && iter_dur != 0) {
if (GST_CLOCK_TIME_IS_VALID (dur)) {
dur = MIN (dur, iter_dur);
} else {
dur = iter_dur;
}
}
}
return dur;
}
GstCaps *
gst_mss_stream_get_caps (GstMssStream * stream)
{
@ -893,6 +943,17 @@ gst_mss_stream_get_fragment_gst_duration (GstMssStream * stream)
timescale);
}
gboolean
gst_mss_stream_has_next_fragment (GstMssStream * stream)
{
g_return_val_if_fail (stream->active, FALSE);
if (stream->current_fragment == NULL)
return FALSE;
return TRUE;
}
GstFlowReturn
gst_mss_stream_advance_fragment (GstMssStream * stream)
{
@ -982,6 +1043,8 @@ gst_mss_stream_seek (GstMssStream * stream, guint64 time)
timescale = gst_mss_stream_get_timescale (stream);
time = gst_util_uint64_scale_round (time, timescale, GST_SECOND);
GST_DEBUG ("Stream %s seeking to %" G_GUINT64_FORMAT, stream->url, time);
for (iter = stream->fragments; iter; iter = g_list_next (iter)) {
GList *next = g_list_next (iter);
if (next) {
@ -1008,6 +1071,10 @@ gst_mss_stream_seek (GstMssStream * stream, guint64 time)
stream->fragment_repetition_index =
(time - fragment->time) / fragment->duration;
}
GST_DEBUG ("Stream %s seeked to fragment time %" G_GUINT64_FORMAT
" repetition %u", stream->url, fragment->time,
stream->fragment_repetition_index);
}
guint64
@ -1039,89 +1106,28 @@ static void
gst_mss_stream_reload_fragments (GstMssStream * stream, xmlNodePtr streamIndex)
{
xmlNodePtr iter;
GList *new_fragments = NULL;
GstMssStreamFragment *previous_fragment = NULL;
GstMssStreamFragment *current_fragment =
stream->current_fragment ? stream->current_fragment->data : NULL;
guint64 current_time = gst_mss_stream_get_fragment_gst_timestamp (stream);
guint fragment_number = 0;
guint64 fragment_time_accum = 0;
guint64 current_gst_time = gst_mss_stream_get_fragment_gst_timestamp (stream);
GstMssFragmentListBuilder builder;
if (!current_fragment && stream->fragments) {
current_fragment = g_list_last (stream->fragments)->data;
} else if (g_list_previous (stream->current_fragment)) {
/* rewind one as this is the next to be pushed */
current_fragment = g_list_previous (stream->current_fragment)->data;
} else {
current_fragment = NULL;
}
gst_mss_fragment_list_builder_init (&builder);
if (current_fragment) {
current_time = current_fragment->time;
fragment_number = current_fragment->number;
fragment_time_accum = current_fragment->time;
}
GST_DEBUG ("Current position: %" GST_TIME_FORMAT,
GST_TIME_ARGS (current_gst_time));
for (iter = streamIndex->children; iter; iter = iter->next) {
if (node_has_type (iter, MSS_NODE_STREAM_FRAGMENT)) {
gchar *duration_str;
gchar *time_str;
gchar *seqnum_str;
GstMssStreamFragment *fragment = g_new (GstMssStreamFragment, 1);
duration_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_DURATION);
time_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_TIME);
seqnum_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_NUMBER);
/* use the node's seq number or use the previous + 1 */
if (seqnum_str) {
fragment->number = g_ascii_strtoull (seqnum_str, NULL, 10);
xmlFree (seqnum_str);
} else {
fragment->number = fragment_number;
}
fragment_number = fragment->number + 1;
if (time_str) {
fragment->time = g_ascii_strtoull (time_str, NULL, 10);
xmlFree (time_str);
fragment_time_accum = fragment->time;
} else {
fragment->time = fragment_time_accum;
}
/* if we have a previous fragment, means we need to set its duration */
if (previous_fragment)
previous_fragment->duration = fragment->time - previous_fragment->time;
if (duration_str) {
fragment->duration = g_ascii_strtoull (duration_str, NULL, 10);
previous_fragment = NULL;
fragment_time_accum += fragment->duration;
xmlFree (duration_str);
} else {
/* store to set the duration at the next iteration */
previous_fragment = fragment;
}
if (fragment->time > current_time) {
new_fragments = g_list_append (new_fragments, fragment);
} else {
previous_fragment = NULL;
g_free (fragment);
}
gst_mss_fragment_list_builder_add (&builder, iter);
} else {
/* TODO gst log this */
}
}
/* store the new fragments list */
if (new_fragments) {
if (builder.fragments) {
g_list_free_full (stream->fragments, g_free);
stream->fragments = new_fragments;
stream->current_fragment = new_fragments;
stream->fragments = g_list_reverse (builder.fragments);
stream->current_fragment = stream->fragments;
gst_mss_stream_seek (stream, current_gst_time);
}
}

View file

@ -48,7 +48,10 @@ void gst_mss_manifest_seek (GstMssManifest * manifest, guint64 time);
gboolean gst_mss_manifest_change_bitrate (GstMssManifest *manifest, guint64 bitrate);
guint64 gst_mss_manifest_get_current_bitrate (GstMssManifest * manifest);
gboolean gst_mss_manifest_is_live (GstMssManifest * manifest);
gint64 gst_mss_manifest_get_dvr_window_length (GstMssManifest * manifest);
gint gst_mss_manifest_get_look_ahead_fragments_count (GstMssManifest * manifest);
void gst_mss_manifest_reload_fragments (GstMssManifest * manifest, GstBuffer * data);
GstClockTime gst_mss_manifest_get_min_fragment_duration (GstMssManifest * manifest);
GstMssStreamType gst_mss_stream_get_type (GstMssStream *stream);
GstCaps * gst_mss_stream_get_caps (GstMssStream * stream);
@ -59,6 +62,7 @@ guint64 gst_mss_stream_get_timescale (GstMssStream * stream);
GstFlowReturn gst_mss_stream_get_fragment_url (GstMssStream * stream, gchar ** url);
GstClockTime gst_mss_stream_get_fragment_gst_timestamp (GstMssStream * stream);
GstClockTime gst_mss_stream_get_fragment_gst_duration (GstMssStream * stream);
gboolean gst_mss_stream_has_next_fragment (GstMssStream * stream);
GstFlowReturn gst_mss_stream_advance_fragment (GstMssStream * stream);
GstFlowReturn gst_mss_stream_regress_fragment (GstMssStream * stream);
void gst_mss_stream_seek (GstMssStream * stream, guint64 time);