adaptivedemux: retry once on 4xx/5xx in certain conditions

This helps catch those 404 server errors in live streams when
seeking to the very beginning, as the server will handle a
request with some delay, which can cause it to drop the fragment
before sending it.

https://bugzilla.gnome.org/show_bug.cgi?id=753751
This commit is contained in:
Vincent Penquerc'h 2016-02-24 15:52:41 +00:00
parent 341cdb198f
commit 535f10b61d
2 changed files with 159 additions and 44 deletions

View file

@ -794,6 +794,7 @@ gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
GError *err = NULL; GError *err = NULL;
gchar *debug = NULL; gchar *debug = NULL;
gchar *new_error = NULL; gchar *new_error = NULL;
const GstStructure *details = NULL;
GST_MANIFEST_LOCK (demux); GST_MANIFEST_LOCK (demux);
@ -814,6 +815,12 @@ gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
err->message = new_error; err->message = new_error;
} }
gst_message_parse_error_details (msg, &details);
if (details) {
gst_structure_get_uint (details, "http-status-code",
&stream->last_status_code);
}
/* error, but ask to retry */ /* error, but ask to retry */
gst_adaptive_demux_stream_fragment_download_finish (stream, gst_adaptive_demux_stream_fragment_download_finish (stream,
GST_FLOW_CUSTOM_ERROR, err); GST_FLOW_CUSTOM_ERROR, err);
@ -2275,6 +2282,17 @@ gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
g_mutex_unlock (&stream->fragment_download_lock); g_mutex_unlock (&stream->fragment_download_lock);
} }
static GstFlowReturn
gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
{
GstFlowReturn ret;
GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
ret = klass->finish_fragment (stream->demux, stream);
gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
return ret;
}
static gboolean static gboolean
_src_event (GstPad * pad, GstObject * parent, GstEvent * event) _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{ {
@ -2283,20 +2301,14 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:{ case GST_EVENT_EOS:{
GstAdaptiveDemuxClass *klass;
GstFlowReturn ret;
GST_MANIFEST_LOCK (demux); GST_MANIFEST_LOCK (demux);
klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
g_mutex_lock (&stream->fragment_download_lock); g_mutex_lock (&stream->fragment_download_lock);
stream->download_finished = TRUE; stream->download_finished = TRUE;
g_cond_signal (&stream->fragment_download_cond); g_cond_signal (&stream->fragment_download_cond);
g_mutex_unlock (&stream->fragment_download_lock); g_mutex_unlock (&stream->fragment_download_lock);
ret = klass->finish_fragment (demux, stream); gst_adaptive_demux_eos_handling (stream);
gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
GST_MANIFEST_UNLOCK (demux); GST_MANIFEST_UNLOCK (demux);
break; break;
@ -2562,11 +2574,12 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
stream->uri_handler = uri_handler; stream->uri_handler = uri_handler;
stream->queue = queue; stream->queue = queue;
stream->last_status_code = 200; /* default to OK */
} }
return TRUE; return TRUE;
} }
static GstPadProbeReturn static GstPadProbeReturn
gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info, gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
gpointer user_data) gpointer user_data)
@ -2582,6 +2595,18 @@ gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
return GST_PAD_PROBE_OK; return GST_PAD_PROBE_OK;
} }
#ifndef GST_DISABLE_GST_DEBUG
static const char *
uritype (GstAdaptiveDemuxStream * s)
{
if (s->downloading_header)
return "header";
if (s->downloading_index)
return "index";
return "fragment";
}
#endif
/* must be called with manifest_lock taken. /* must be called with manifest_lock taken.
* Can temporarily release manifest_lock * Can temporarily release manifest_lock
*/ */
@ -2591,8 +2616,9 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
gint64 end, guint * http_status) gint64 end, guint * http_status)
{ {
GstFlowReturn ret = GST_FLOW_OK; GstFlowReturn ret = GST_FLOW_OK;
GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT GST_DEBUG_OBJECT (stream->pad,
" - %" G_GINT64_FORMAT, uri, start, end); "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
uritype (stream), uri, start, end);
if (http_status) if (http_status)
*http_status = 200; /* default to ok if no further information */ *http_status = 200; /* default to ok if no further information */
@ -2650,7 +2676,7 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
/* wait for the fragment to be completely downloaded */ /* wait for the fragment to be completely downloaded */
GST_DEBUG_OBJECT (stream->pad, GST_DEBUG_OBJECT (stream->pad,
"Waiting for fragment download to finish: %s", uri); "Waiting for %s download to finish: %s", uritype (stream), uri);
g_mutex_lock (&stream->fragment_download_lock); g_mutex_lock (&stream->fragment_download_lock);
stream->src_at_ready = FALSE; stream->src_at_ready = FALSE;
@ -2677,8 +2703,9 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
ret = stream->last_ret; ret = stream->last_ret;
GST_DEBUG_OBJECT (stream->pad, "Fragment download finished: %s %d %s", GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
uri, stream->last_ret, gst_flow_get_name (stream->last_ret)); uritype (stream), uri, stream->last_ret,
gst_flow_get_name (stream->last_ret));
if (stream->last_ret != GST_FLOW_OK && http_status) { if (stream->last_ret != GST_FLOW_OK && http_status) {
*http_status = stream->last_status_code; *http_status = stream->last_status_code;
} }
@ -2776,7 +2803,10 @@ gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
{ {
GstAdaptiveDemux *demux = stream->demux; GstAdaptiveDemux *demux = stream->demux;
gchar *url = NULL; gchar *url = NULL;
GstFlowReturn ret = GST_FLOW_OK; GstFlowReturn ret;
gboolean retried_once = FALSE, live;
guint http_status;
guint last_status_code;
stream->starting_fragment = TRUE; stream->starting_fragment = TRUE;
stream->last_ret = GST_FLOW_OK; stream->last_ret = GST_FLOW_OK;
@ -2794,16 +2824,23 @@ gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
stream->need_header = FALSE; stream->need_header = FALSE;
} }
again:
ret = GST_FLOW_OK;
url = stream->fragment.uri; url = stream->fragment.uri;
GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream); GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
if (url) { if (!url)
guint http_status = 200; return ret;
stream->last_ret = GST_FLOW_OK;
http_status = 200;
ret = ret =
gst_adaptive_demux_stream_download_uri (demux, stream, url, gst_adaptive_demux_stream_download_uri (demux, stream, url,
stream->fragment.range_start, stream->fragment.range_end, &http_status); stream->fragment.range_start, stream->fragment.range_end, &http_status);
GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s", GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
stream->last_ret, http_status, gst_flow_get_name (stream->last_ret)); stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
if (ret != GST_FLOW_OK) { if (ret == GST_FLOW_OK)
goto beach;
g_mutex_lock (&stream->fragment_download_lock); g_mutex_lock (&stream->fragment_download_lock);
if (G_UNLIKELY (stream->cancelled)) { if (G_UNLIKELY (stream->cancelled)) {
g_mutex_unlock (&stream->fragment_download_lock); g_mutex_unlock (&stream->fragment_download_lock);
@ -2812,26 +2849,103 @@ gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
g_mutex_unlock (&stream->fragment_download_lock); g_mutex_unlock (&stream->fragment_download_lock);
/* TODO check if we are truly stopping */ /* TODO check if we are truly stopping */
if (ret == GST_FLOW_CUSTOM_ERROR && gst_adaptive_demux_is_live (demux)) { if (ret != GST_FLOW_CUSTOM_ERROR)
if (++stream->download_error_count <= MAX_DOWNLOAD_ERROR_COUNT) { goto beach;
/* looks like there is no way of knowing when a live stream has ended
* Have to assume we are falling behind and cause a manifest reload */ last_status_code = stream->last_status_code;
GST_DEBUG_OBJECT (stream->pad, GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
"Converting error of live stream to EOS"); last_status_code, stream->download_error_count);
return GST_FLOW_EOS;
live = gst_adaptive_demux_is_live (demux);
if (!retried_once && ((last_status_code / 100 == 4 && live) || last_status_code / 100 == 5)) { /* 4xx/5xx */
/* if current position is before available start, switch to next */
if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
goto flushing;
if (live) {
gint64 range_start, range_stop;
if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
&range_stop))
goto flushing;
if (demux->segment.position < range_start) {
GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
stream->last_ret = GST_FLOW_OK;
ret = gst_adaptive_demux_eos_handling (stream);
GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
gst_flow_get_name (ret));
ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
gst_flow_get_name (ret));
if (ret == GST_FLOW_OK) {
retried_once = TRUE;
goto again;
} }
} else if (ret == GST_FLOW_CUSTOM_ERROR } else if (demux->segment.position > range_stop) {
&& !gst_adaptive_demux_stream_has_next_fragment (demux, stream)) { /* wait a bit to be in range, we don't have any locks at that point */
/* If this is the last fragment, consider failures EOS and not actual gint64 wait_time =
* errors. Due to rounding errors in the durations, the last fragment gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
* might not actually exist */ if (wait_time > 0) {
gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
GST_DEBUG_OBJECT (stream->pad, GST_DEBUG_OBJECT (stream->pad,
"Converting error for last fragment to EOS"); "Download waiting for %" GST_TIME_FORMAT,
return GST_FLOW_EOS; GST_TIME_ARGS (wait_time));
GST_MANIFEST_UNLOCK (demux);
g_mutex_lock (&stream->fragment_download_lock);
if (G_UNLIKELY (stream->cancelled)) {
g_mutex_unlock (&stream->fragment_download_lock);
GST_MANIFEST_LOCK (demux);
stream->last_ret = GST_FLOW_FLUSHING;
goto flushing;
}
do {
g_cond_wait_until (&stream->fragment_download_cond,
&stream->fragment_download_lock, end_time);
if (G_UNLIKELY (stream->cancelled)) {
g_mutex_unlock (&stream->fragment_download_lock);
GST_MANIFEST_LOCK (demux);
stream->last_ret = GST_FLOW_FLUSHING;
goto flushing;
}
} while (!stream->download_finished);
g_mutex_unlock (&stream->fragment_download_lock);
GST_MANIFEST_LOCK (demux);
} }
} }
} }
flushing:
if (++stream->download_error_count <= MAX_DOWNLOAD_ERROR_COUNT) {
/* looks like there is no way of knowing when a live stream has ended
* Have to assume we are falling behind and cause a manifest reload */
GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
return GST_FLOW_EOS;
}
}
else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
/* If this is the last fragment, consider failures EOS and not actual
* errors. Due to rounding errors in the durations, the last fragment
* might not actually exist */
GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
return GST_FLOW_EOS;
} else {
/* retry once (same segment) for 5xx (server errors) */
if (!retried_once) {
retried_once = TRUE;
/* wait a short time in case the server needs a bit to recover, we don't
* care if we get woken up before end time. We can use sleep here since
* we're already blocking and just want to wait some time. */
g_usleep (100000); /* a tenth of a second */
goto again;
}
}
beach:
return ret; return ret;
no_url_error: no_url_error:

View file

@ -145,6 +145,7 @@ struct _GstAdaptiveDemuxStream
/* download tooling */ /* download tooling */
GstElement *src; GstElement *src;
guint last_status_code;
GstPad *src_srcpad; GstPad *src_srcpad;
GstElement *uri_handler; GstElement *uri_handler;
GstElement *queue; GstElement *queue;