mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-19 14:56:36 +00:00
mssdemux: implement live streams handling
Live streams force the demuxer to keep reloading the Manifest from time to time, as the new fragments are being added as they are recorded. The demuxer should also try to keep up and detect when it had to skip fragments, marking the discont flag when that happens. Curiously, the spec doesn't seem to mention when/how a live stream is supposed to end, so keep trying downloads until the demuxer errors out.
This commit is contained in:
parent
0704f15103
commit
aa1713068a
4 changed files with 220 additions and 9 deletions
|
@ -258,6 +258,7 @@ gst_mss_demux_reset (GstMssDemux * mssdemux)
|
|||
|
||||
mssdemux->n_videos = mssdemux->n_audios = 0;
|
||||
g_free (mssdemux->base_url);
|
||||
g_free (mssdemux->manifest_uri);
|
||||
mssdemux->base_url = NULL;
|
||||
}
|
||||
|
||||
|
@ -562,14 +563,24 @@ gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
|
|||
GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
|
||||
break;
|
||||
}
|
||||
case GST_QUERY_LATENCY:
|
||||
gst_query_set_latency (query, FALSE, 0, -1);
|
||||
case GST_QUERY_LATENCY:{
|
||||
gboolean live = FALSE;
|
||||
|
||||
live = mssdemux->manifest
|
||||
&& gst_mss_manifest_is_live (mssdemux->manifest);
|
||||
|
||||
gst_query_set_latency (query, live, 0, -1);
|
||||
ret = TRUE;
|
||||
break;
|
||||
}
|
||||
case GST_QUERY_SEEKING:{
|
||||
GstFormat fmt;
|
||||
gint64 stop = -1;
|
||||
|
||||
if (mssdemux->manifest && gst_mss_manifest_is_live (mssdemux->manifest)) {
|
||||
return FALSE; /* no live seeking */
|
||||
}
|
||||
|
||||
gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
|
||||
GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
|
||||
fmt);
|
||||
|
@ -728,6 +739,7 @@ gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
|
|||
gst_query_parse_uri (query, &uri);
|
||||
GST_INFO_OBJECT (mssdemux, "Upstream is using URI: %s", uri);
|
||||
|
||||
mssdemux->manifest_uri = g_strdup (uri);
|
||||
baseurl_end = g_strrstr (uri, "/Manifest");
|
||||
if (baseurl_end) {
|
||||
/* set the new end of the string */
|
||||
|
@ -754,6 +766,9 @@ gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
|
|||
return FALSE;
|
||||
}
|
||||
|
||||
GST_INFO_OBJECT (mssdemux, "Live stream: %d",
|
||||
gst_mss_manifest_is_live (mssdemux->manifest));
|
||||
|
||||
gst_mss_demux_create_streams (mssdemux);
|
||||
for (iter = mssdemux->streams; iter;) {
|
||||
GSList *current = iter;
|
||||
|
@ -779,6 +794,27 @@ gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
|
|||
return TRUE;
|
||||
}
|
||||
|
||||
static void
|
||||
gst_mss_demux_reload_manifest (GstMssDemux * mssdemux)
|
||||
{
|
||||
GstUriDownloader *downloader;
|
||||
GstFragment *manifest_data;
|
||||
GstBuffer *manifest_buffer;
|
||||
|
||||
downloader = gst_uri_downloader_new ();
|
||||
|
||||
manifest_data =
|
||||
gst_uri_downloader_fetch_uri (downloader, mssdemux->manifest_uri);
|
||||
manifest_buffer = gst_fragment_get_buffer (manifest_data);
|
||||
g_object_unref (manifest_data);
|
||||
|
||||
gst_mss_manifest_reload_fragments (mssdemux->manifest, manifest_buffer);
|
||||
gst_buffer_replace (&mssdemux->manifest_buffer, manifest_buffer);
|
||||
gst_buffer_unref (manifest_buffer);
|
||||
|
||||
g_object_unref (downloader);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
|
||||
{
|
||||
|
@ -911,6 +947,8 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
|
|||
case GST_FLOW_OK:
|
||||
break; /* all is good, let's go */
|
||||
case GST_FLOW_UNEXPECTED: /* EOS */
|
||||
gst_mss_demux_reload_manifest (mssdemux);
|
||||
return GST_FLOW_OK;
|
||||
return GST_FLOW_UNEXPECTED;
|
||||
case GST_FLOW_ERROR:
|
||||
goto error;
|
||||
|
@ -924,6 +962,8 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
|
|||
|
||||
url = g_strdup_printf ("%s/%s", mssdemux->base_url, path);
|
||||
|
||||
GST_DEBUG_OBJECT (mssdemux, "Got url '%s' for stream %p", url, stream);
|
||||
|
||||
fragment = gst_uri_downloader_fetch_uri (stream->downloader, url);
|
||||
g_free (path);
|
||||
g_free (url);
|
||||
|
@ -931,6 +971,11 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
|
|||
if (!fragment) {
|
||||
GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
|
||||
/* TODO check if we are truly stoping */
|
||||
if (gst_mss_manifest_is_live (mssdemux->manifest)) {
|
||||
/* looks like there is no way of knowing when a live stream has ended
|
||||
* Have to assume we are falling behind and cause a manifest reload */
|
||||
return GST_FLOW_OK;
|
||||
}
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
|
||||
|
@ -949,9 +994,11 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
|
|||
|
||||
if (_buffer) {
|
||||
GST_DEBUG_OBJECT (mssdemux,
|
||||
"Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT,
|
||||
"Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
|
||||
" Duration: %" GST_TIME_FORMAT,
|
||||
stream, GST_PAD_NAME (stream->pad),
|
||||
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)));
|
||||
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)),
|
||||
GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer)));
|
||||
gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
|
||||
}
|
||||
|
||||
|
@ -995,9 +1042,9 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
|
|||
break;
|
||||
}
|
||||
|
||||
g_assert (buffer != NULL);
|
||||
|
||||
gst_mss_stream_advance_fragment (stream->manifest_stream);
|
||||
if (buffer) {
|
||||
gst_mss_stream_advance_fragment (stream->manifest_stream);
|
||||
}
|
||||
GST_LOG_OBJECT (mssdemux, "download loop end %p", stream);
|
||||
return;
|
||||
|
||||
|
@ -1130,10 +1177,25 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
|
|||
}
|
||||
|
||||
if (G_LIKELY (GST_IS_BUFFER (object))) {
|
||||
if (GST_BUFFER_TIMESTAMP (object) != stream->next_timestamp) {
|
||||
GST_ERROR_OBJECT (mssdemux, "Marking buffer %p as discont buffer:%"
|
||||
GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, object,
|
||||
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
|
||||
GST_TIME_ARGS (stream->next_timestamp));
|
||||
GST_BUFFER_FLAG_SET (object, GST_BUFFER_FLAG_DISCONT);
|
||||
}
|
||||
|
||||
GST_DEBUG_OBJECT (mssdemux,
|
||||
"Pushing buffer %p %" GST_TIME_FORMAT " on pad %s", object,
|
||||
"Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
|
||||
" discont:%d on pad %s", object,
|
||||
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
|
||||
GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
|
||||
GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
|
||||
GST_PAD_NAME (stream->pad));
|
||||
|
||||
stream->next_timestamp =
|
||||
GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
|
||||
|
||||
ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
|
||||
} else if (GST_IS_EVENT (object)) {
|
||||
if (GST_EVENT_TYPE (object) == GST_EVENT_EOS)
|
||||
|
|
|
@ -63,6 +63,8 @@ struct _GstMssDemuxStream {
|
|||
|
||||
GstEvent *pending_newsegment;
|
||||
|
||||
GstClockTime next_timestamp;
|
||||
|
||||
/* Downloading task */
|
||||
GstTask *download_task;
|
||||
GStaticRecMutex download_lock;
|
||||
|
@ -80,6 +82,7 @@ struct _GstMssDemux {
|
|||
|
||||
GstMssManifest *manifest;
|
||||
gchar *base_url;
|
||||
gchar *manifest_uri;
|
||||
|
||||
GSList *streams;
|
||||
guint n_videos;
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
|
||||
#include <glib.h>
|
||||
#include <string.h>
|
||||
#include <stdio.h>
|
||||
#include <ctype.h>
|
||||
#include <libxml/parser.h>
|
||||
#include <libxml/tree.h>
|
||||
|
||||
|
@ -43,6 +45,8 @@
|
|||
#define MSS_PROP_TIMESCALE "TimeScale"
|
||||
#define MSS_PROP_URL "Url"
|
||||
|
||||
#define TO_LOWER(str) { char* p = str; for ( ; *p; ++p) *p = tolower(*p); }
|
||||
|
||||
/* TODO check if atoi is successful? */
|
||||
|
||||
typedef struct _GstMssStreamFragment
|
||||
|
@ -85,6 +89,8 @@ struct _GstMssManifest
|
|||
xmlDocPtr xml;
|
||||
xmlNodePtr xmlrootnode;
|
||||
|
||||
gboolean is_live;
|
||||
|
||||
GSList *streams;
|
||||
};
|
||||
|
||||
|
@ -187,7 +193,6 @@ _gst_mss_stream_init (GstMssStream * stream, xmlNodePtr node)
|
|||
|
||||
/* we reverse it later */
|
||||
stream->fragments = g_list_prepend (stream->fragments, fragment);
|
||||
|
||||
} else if (node_has_type (iter, MSS_NODE_STREAM_QUALITY)) {
|
||||
GstMssStreamQuality *quality = gst_mss_stream_quality_new (iter);
|
||||
stream->qualities = g_list_prepend (stream->qualities, quality);
|
||||
|
@ -215,6 +220,7 @@ gst_mss_manifest_new (const GstBuffer * data)
|
|||
GstMssManifest *manifest;
|
||||
xmlNodePtr root;
|
||||
xmlNodePtr nodeiter;
|
||||
gchar *live_str;
|
||||
|
||||
manifest = g_malloc0 (sizeof (GstMssManifest));
|
||||
|
||||
|
@ -222,6 +228,12 @@ gst_mss_manifest_new (const GstBuffer * data)
|
|||
GST_BUFFER_SIZE (data), "manifest", NULL, 0);
|
||||
root = manifest->xmlrootnode = xmlDocGetRootElement (manifest->xml);
|
||||
|
||||
live_str = (gchar *) xmlGetProp (root, (xmlChar *) "IsLive");
|
||||
if (live_str) {
|
||||
TO_LOWER (live_str);
|
||||
manifest->is_live = strcmp (live_str, "true") == 0;
|
||||
}
|
||||
|
||||
for (nodeiter = root->children; nodeiter; nodeiter = nodeiter->next) {
|
||||
if (nodeiter->type == XML_ELEMENT_NODE
|
||||
&& (strcmp ((const char *) nodeiter->name, "StreamIndex") == 0)) {
|
||||
|
@ -777,6 +789,138 @@ gst_mss_manifest_get_current_bitrate (GstMssManifest * manifest)
|
|||
return bitrate;
|
||||
}
|
||||
|
||||
gboolean
|
||||
gst_mss_manifest_is_live (GstMssManifest * manifest)
|
||||
{
|
||||
return manifest->is_live;
|
||||
}
|
||||
|
||||
static void
|
||||
gst_mss_stream_reload_fragments (GstMssStream * stream, xmlNodePtr streamIndex)
|
||||
{
|
||||
xmlNodePtr iter;
|
||||
GList *new_fragments = NULL;
|
||||
GstMssStreamFragment *previous_fragment = NULL;
|
||||
GstMssStreamFragment *current_fragment =
|
||||
stream->current_fragment ? stream->current_fragment->data : NULL;
|
||||
guint64 current_time = gst_mss_stream_get_fragment_gst_timestamp (stream);
|
||||
guint fragment_number = 0;
|
||||
guint64 fragment_time_accum = 0;
|
||||
|
||||
if (!current_fragment && stream->fragments) {
|
||||
current_fragment = g_list_last (stream->fragments)->data;
|
||||
} else if (g_list_previous (stream->current_fragment)) {
|
||||
/* rewind one as this is the next to be pushed */
|
||||
current_fragment = g_list_previous (stream->current_fragment)->data;
|
||||
} else {
|
||||
current_fragment = NULL;
|
||||
}
|
||||
|
||||
if (current_fragment) {
|
||||
current_time = current_fragment->time;
|
||||
fragment_number = current_fragment->number;
|
||||
fragment_time_accum = current_fragment->time;
|
||||
}
|
||||
|
||||
for (iter = streamIndex->children; iter; iter = iter->next) {
|
||||
if (node_has_type (iter, MSS_NODE_STREAM_FRAGMENT)) {
|
||||
gchar *duration_str;
|
||||
gchar *time_str;
|
||||
gchar *seqnum_str;
|
||||
GstMssStreamFragment *fragment = g_new (GstMssStreamFragment, 1);
|
||||
|
||||
duration_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_DURATION);
|
||||
time_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_TIME);
|
||||
seqnum_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_NUMBER);
|
||||
|
||||
/* use the node's seq number or use the previous + 1 */
|
||||
if (seqnum_str) {
|
||||
fragment->number = g_ascii_strtoull (seqnum_str, NULL, 10);
|
||||
g_free (seqnum_str);
|
||||
} else {
|
||||
fragment->number = fragment_number;
|
||||
}
|
||||
fragment_number = fragment->number + 1;
|
||||
|
||||
if (time_str) {
|
||||
fragment->time = g_ascii_strtoull (time_str, NULL, 10);
|
||||
g_free (time_str);
|
||||
fragment_time_accum = fragment->time;
|
||||
} else {
|
||||
fragment->time = fragment_time_accum;
|
||||
}
|
||||
|
||||
/* if we have a previous fragment, means we need to set its duration */
|
||||
if (previous_fragment)
|
||||
previous_fragment->duration = fragment->time - previous_fragment->time;
|
||||
|
||||
if (duration_str) {
|
||||
fragment->duration = g_ascii_strtoull (duration_str, NULL, 10);
|
||||
|
||||
previous_fragment = NULL;
|
||||
fragment_time_accum += fragment->duration;
|
||||
g_free (duration_str);
|
||||
} else {
|
||||
/* store to set the duration at the next iteration */
|
||||
previous_fragment = fragment;
|
||||
}
|
||||
|
||||
if (fragment->time > current_time) {
|
||||
new_fragments = g_list_append (new_fragments, fragment);
|
||||
} else {
|
||||
previous_fragment = NULL;
|
||||
g_free (fragment);
|
||||
}
|
||||
|
||||
} else {
|
||||
/* TODO gst log this */
|
||||
}
|
||||
}
|
||||
|
||||
/* store the new fragments list */
|
||||
if (new_fragments) {
|
||||
g_list_free_full (stream->fragments, g_free);
|
||||
stream->fragments = new_fragments;
|
||||
stream->current_fragment = new_fragments;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
gst_mss_manifest_reload_fragments_from_xml (GstMssManifest * manifest,
|
||||
xmlNodePtr root)
|
||||
{
|
||||
xmlNodePtr nodeiter;
|
||||
GSList *streams = manifest->streams;
|
||||
|
||||
/* we assume the server is providing the streams in the same order in
|
||||
* every manifest */
|
||||
for (nodeiter = root->children; nodeiter && streams;
|
||||
nodeiter = nodeiter->next) {
|
||||
if (nodeiter->type == XML_ELEMENT_NODE
|
||||
&& (strcmp ((const char *) nodeiter->name, "StreamIndex") == 0)) {
|
||||
gst_mss_stream_reload_fragments (streams->data, nodeiter);
|
||||
streams = g_slist_next (streams);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
gst_mss_manifest_reload_fragments (GstMssManifest * manifest, GstBuffer * data)
|
||||
{
|
||||
xmlDocPtr xml;
|
||||
xmlNodePtr root;
|
||||
|
||||
g_return_if_fail (manifest->is_live);
|
||||
|
||||
xml = xmlReadMemory ((const gchar *) GST_BUFFER_DATA (data),
|
||||
GST_BUFFER_SIZE (data), "manifest", NULL, 0);
|
||||
root = xmlDocGetRootElement (xml);
|
||||
|
||||
gst_mss_manifest_reload_fragments_from_xml (manifest, root);
|
||||
|
||||
xmlFreeDoc (xml);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_mss_stream_select_bitrate (GstMssStream * stream, guint64 bitrate)
|
||||
{
|
||||
|
|
|
@ -47,6 +47,8 @@ GstClockTime gst_mss_manifest_get_gst_duration (GstMssManifest * manifest);
|
|||
gboolean gst_mss_manifest_seek (GstMssManifest * manifest, guint64 time);
|
||||
gboolean gst_mss_manifest_change_bitrate (GstMssManifest *manifest, guint64 bitrate);
|
||||
guint64 gst_mss_manifest_get_current_bitrate (GstMssManifest * manifest);
|
||||
gboolean gst_mss_manifest_is_live (GstMssManifest * manifest);
|
||||
void gst_mss_manifest_reload_fragments (GstMssManifest * manifest, GstBuffer * data);
|
||||
|
||||
GstMssStreamType gst_mss_stream_get_type (GstMssStream *stream);
|
||||
GstCaps * gst_mss_stream_get_caps (GstMssStream * stream);
|
||||
|
|
Loading…
Reference in a new issue