mssdemux: avoid downloading not-linked streams

When a stream gets a not-linked return, it will be marked as so and
won't download any more new fragments until a reconfigure event
is received. This will make mssdemux expose all pads, but only download
fragments for the streams that are actually being used.

Relying on the pads being linked/unlinked isn't enough in this scenario
as there might be an input-selector downstream that is actually discarding
buffers for a given linked pad.

When streams are switching, the old active stream can be blocked because
input-selector will block not-linked streams. In case the mssdemux's
stream loop is blocked pushing a buffer to a full queue downstream it will
never unblock as the queue will not drain (input-selector is blocking).

In this scenario, stream switching will deadlock as input-selector is
waiting for the newly active stream data and the stream_loop that would
push this data is blocked waiting for input-selector.

To solve this issue, whenever an stream is reactivated on a reconfigure
it will enter into the 'catch up mode', in this mode it can push buffers
from its download thread until it reaches the currrent GstSegment's position.
This works because this timestamp will always be behind or equal to the maximum
timestamp pushed for all streams, after pushing data for this timestamp,
the stream will go back to default and be pushed sequentially from the main
streaming thread. By this time, the input-selector should have already
released the thread.

https://bugzilla.gnome.org/show_bug.cgi?id=711849
This commit is contained in:
Thiago Santos 2013-11-12 09:58:31 -03:00
parent 692a053ef8
commit 2ce4f6a8e4
2 changed files with 219 additions and 27 deletions

View file

@ -234,6 +234,7 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
stream->downloader = gst_uri_downloader_new (); stream->downloader = gst_uri_downloader_new ();
stream->dataqueue = stream->dataqueue =
gst_data_queue_new (_data_queue_check_full, NULL, NULL, stream); gst_data_queue_new (_data_queue_check_full, NULL, NULL, stream);
g_mutex_init (&stream->mutex);
/* Downloading task */ /* Downloading task */
g_rec_mutex_init (&stream->download_lock); g_rec_mutex_init (&stream->download_lock);
@ -292,6 +293,7 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
} }
if (stream->caps) if (stream->caps)
gst_caps_unref (stream->caps); gst_caps_unref (stream->caps);
g_mutex_clear (&stream->mutex);
g_free (stream); g_free (stream);
} }
@ -619,6 +621,9 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
GstMssDemuxStream *stream = iter->data; GstMssDemuxStream *stream = iter->data;
stream->eos = FALSE; stream->eos = FALSE;
if (flags & GST_SEEK_FLAG_FLUSH) {
stream->last_ret = GST_FLOW_OK;
}
gst_data_queue_flush (stream->dataqueue); gst_data_queue_flush (stream->dataqueue);
gst_event_replace (&stream->pending_newsegment, newsegment); gst_event_replace (&stream->pending_newsegment, newsegment);
} }
@ -637,6 +642,26 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_event_unref (event); gst_event_unref (event);
return TRUE; return TRUE;
} }
case GST_EVENT_RECONFIGURE:{
GSList *iter;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
if (stream->pad == pad) {
GST_MSS_DEMUX_STREAM_LOCK (stream);
if (GST_TASK_STATE (stream->download_task) == GST_TASK_PAUSED
&& stream->last_ret == GST_FLOW_NOT_LINKED) {
stream->restart_download = TRUE;
gst_task_start (stream->download_task);
}
GST_MSS_DEMUX_STREAM_UNLOCK (stream);
gst_event_unref (event);
return TRUE;
}
}
}
break;
default: default:
break; break;
} }
@ -1051,16 +1076,23 @@ gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
static GstFlowReturn static GstFlowReturn
gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
gboolean * buffer_downloaded) GstBuffer ** _buffer)
{ {
GstMssDemux *mssdemux = stream->parent; GstMssDemux *mssdemux = stream->parent;
gchar *path; gchar *path;
gchar *url; gchar *url;
GstFragment *fragment; GstFragment *fragment;
GstBuffer *_buffer; GstBuffer *buffer;
GstFlowReturn ret = GST_FLOW_OK; GstFlowReturn ret = GST_FLOW_OK;
guint64 before_download, after_download; guint64 before_download, after_download;
/* special case for not-linked streams */
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (mssdemux, "Skipping download for not-linked stream %p",
stream);
return GST_FLOW_NOT_LINKED;
}
before_download = g_get_real_time (); before_download = g_get_real_time ();
GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream); GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
@ -1103,22 +1135,19 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
return GST_FLOW_ERROR; return GST_FLOW_ERROR;
} }
_buffer = gst_fragment_get_buffer (fragment); buffer = gst_fragment_get_buffer (fragment);
_buffer = gst_buffer_make_writable (_buffer); *_buffer = buffer = gst_buffer_make_writable (buffer);
GST_BUFFER_TIMESTAMP (_buffer) = GST_BUFFER_TIMESTAMP (buffer) =
gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream); gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
GST_BUFFER_DURATION (_buffer) = GST_BUFFER_DURATION (buffer) =
gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream); gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
g_object_unref (fragment); g_object_unref (fragment);
if (buffer_downloaded)
*buffer_downloaded = _buffer != NULL;
after_download = g_get_real_time (); after_download = g_get_real_time ();
if (_buffer) { if (_buffer) {
#ifndef GST_DISABLE_GST_DEBUG #ifndef GST_DISABLE_GST_DEBUG
guint64 bitrate = (8 * gst_buffer_get_size (_buffer) * 1000000LLU) / guint64 bitrate = (8 * gst_buffer_get_size (buffer) * 1000000LLU) /
(after_download - before_download); (after_download - before_download);
#endif #endif
@ -1126,16 +1155,8 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
"Measured download bitrate: %s %" G_GUINT64_FORMAT " bps", "Measured download bitrate: %s %" G_GUINT64_FORMAT " bps",
GST_PAD_NAME (stream->pad), bitrate); GST_PAD_NAME (stream->pad), bitrate);
gst_download_rate_add_rate (&stream->download_rate, gst_download_rate_add_rate (&stream->download_rate,
gst_buffer_get_size (_buffer), gst_buffer_get_size (buffer),
1000 * (after_download - before_download)); 1000 * (after_download - before_download));
GST_DEBUG_OBJECT (mssdemux,
"Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
" Duration: %" GST_TIME_FORMAT,
stream, GST_PAD_NAME (stream->pad),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer)));
gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
} }
return ret; return ret;
@ -1160,22 +1181,130 @@ static void
gst_mss_demux_download_loop (GstMssDemuxStream * stream) gst_mss_demux_download_loop (GstMssDemuxStream * stream)
{ {
GstMssDemux *mssdemux = stream->parent; GstMssDemux *mssdemux = stream->parent;
gboolean buffer_downloaded = FALSE;
GstFlowReturn ret; GstFlowReturn ret;
GstBuffer *buffer = NULL;
gboolean buffer_downloaded = FALSE;
GstEvent *gap = NULL;
GST_LOG_OBJECT (mssdemux, "download loop start %p", stream); GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
GST_OBJECT_LOCK (mssdemux); GST_OBJECT_LOCK (mssdemux);
if (G_UNLIKELY (stream->restart_download)) {
GstClockTime cur, ts;
gint64 pos;
GST_MSS_DEMUX_STREAM_LOCK (stream);
GST_DEBUG_OBJECT (mssdemux,
"Activating stream %p due to reconfigure " "event", stream);
cur = GST_CLOCK_TIME_IS_VALID (stream->next_timestamp) ?
stream->next_timestamp : 0;
if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
ts = (GstClockTime) pos;
GST_DEBUG_OBJECT (mssdemux, "Downstream position: %"
GST_TIME_FORMAT, GST_TIME_ARGS (ts));
} else {
GST_DEBUG_OBJECT (mssdemux, "Downstream position query failed, "
"failling back to segment position");
ts = mssdemux->segment.position;
}
/* we might have already pushed this data */
ts = MAX (ts, stream->next_timestamp);
GST_DEBUG_OBJECT (mssdemux, "Restarting stream %p %s:%s at "
"position %" GST_TIME_FORMAT ", catching up until segment position %"
GST_TIME_FORMAT, stream, GST_DEBUG_PAD_NAME (stream->pad),
GST_TIME_ARGS (ts), GST_TIME_ARGS (mssdemux->segment.position));
if (GST_CLOCK_TIME_IS_VALID (ts)) {
gst_mss_stream_seek (stream->manifest_stream, ts);
if (cur < ts) {
gap = gst_event_new_gap (cur, ts - cur);
}
}
/* This stream might be entering into catching up mode,
* meaning that it will push buffers from this same download thread
* until it reaches the segment position.
*
* The reason for this is that in case of stream switching, the other
* stream that was previously active might be blocking the stream_loop
* in case it is ahead enough that all queues are filled.
* In this case, it is possible that a downstream input-selector is
* blocking waiting for the currently active stream to reach the
* same position of the old linked stream because of the 'sync-streams'
* behavior.
*
* We can push from this thread up to segment position as all other
* streams should be around the same timestamp.
*/
stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
stream->eos = FALSE;
gst_data_queue_set_flushing (stream->dataqueue, FALSE);
stream->restart_download = FALSE;
gst_task_start (mssdemux->stream_task);
GST_MSS_DEMUX_STREAM_UNLOCK (stream);
}
GST_DEBUG_OBJECT (mssdemux, GST_DEBUG_OBJECT (mssdemux,
"Starting streams reconfiguration due to bitrate changes"); "Starting streams reconfiguration due to bitrate changes");
gst_mss_demux_reconfigure_stream (stream); gst_mss_demux_reconfigure_stream (stream);
GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration"); GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
GST_OBJECT_UNLOCK (mssdemux); GST_OBJECT_UNLOCK (mssdemux);
ret = gst_mss_demux_stream_download_fragment (stream, &buffer_downloaded); if (gap != NULL)
gst_pad_push_event (stream->pad, gap);
if (stream->cancelled) ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
buffer_downloaded = buffer != NULL;
if (stream->cancelled) {
if (buffer)
gst_buffer_unref (buffer);
goto cancelled; goto cancelled;
}
if (buffer) {
gboolean catch_up = FALSE;
/* Check if this stream is on catch up mode */
if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) {
GST_DEBUG_OBJECT (mssdemux,
"Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT,
GST_TIME_ARGS (mssdemux->segment.position),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
if (GST_BUFFER_TIMESTAMP (buffer) < mssdemux->segment.position) {
catch_up = TRUE;
} else {
GST_OBJECT_LOCK (mssdemux);
stream->last_ret = GST_FLOW_OK;
gst_task_start (mssdemux->stream_task);
GST_OBJECT_UNLOCK (mssdemux);
}
}
GST_DEBUG_OBJECT (mssdemux,
"%s buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
" Duration: %" GST_TIME_FORMAT,
catch_up ? "Catch up push for" : "Storing", stream,
GST_PAD_NAME (stream->pad),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
if (catch_up) {
ret = stream->last_ret = gst_pad_push (stream->pad, buffer);
if (G_LIKELY (ret == GST_FLOW_OK))
stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
/* TODO handle return */
} else {
gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (buffer));
}
}
switch (ret) { switch (ret) {
case GST_FLOW_OK: case GST_FLOW_OK:
@ -1184,6 +1313,8 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
goto eos; goto eos;
case GST_FLOW_ERROR: case GST_FLOW_ERROR:
goto error; goto error;
case GST_FLOW_NOT_LINKED:
goto notlinked;
default: default:
break; break;
} }
@ -1203,7 +1334,10 @@ eos:
GST_DEBUG_PAD_NAME (stream->pad)); GST_DEBUG_PAD_NAME (stream->pad));
gst_mss_demux_stream_store_object (stream, gst_mss_demux_stream_store_object (stream,
GST_MINI_OBJECT_CAST (gst_event_new_eos ())); GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
GST_OBJECT_LOCK (mssdemux);
gst_task_pause (stream->download_task); gst_task_pause (stream->download_task);
gst_task_start (mssdemux->stream_task);
GST_OBJECT_UNLOCK (mssdemux);
return; return;
} }
error: error:
@ -1222,6 +1356,15 @@ cancelled:
gst_task_pause (stream->download_task); gst_task_pause (stream->download_task);
return; return;
} }
notlinked:
{
GST_MSS_DEMUX_STREAM_LOCK (stream);
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
gst_task_pause (stream->download_task);
gst_data_queue_set_flushing (stream->dataqueue, TRUE);
}
GST_MSS_DEMUX_STREAM_UNLOCK (stream);
}
} }
static GstFlowReturn static GstFlowReturn
@ -1242,12 +1385,18 @@ gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
GstDataQueueItem *item; GstDataQueueItem *item;
other = iter->data; other = iter->data;
if (other->eos) { if (other->eos || other->last_ret == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (mssdemux, "Skipping stream %p eos:%d last-ret:%d",
other, other->eos, other->last_ret);
continue; continue;
} }
if (!gst_data_queue_peek (other->dataqueue, &item)) { if (!gst_data_queue_peek (other->dataqueue, &item)) {
/* flushing */ /* flushing */
if (other->last_ret == GST_FLOW_NOT_LINKED) {
/* might have been unlinked and won't receive data for now */
continue;
}
return GST_FLOW_FLUSHING; return GST_FLOW_FLUSHING;
} }
@ -1269,6 +1418,27 @@ gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
return ret; return ret;
} }
static GstFlowReturn
gst_mss_demux_combine_flows (GstMssDemux * mssdemux)
{
gboolean all_notlinked = TRUE;
GSList *iter;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
if (stream->last_ret != GST_FLOW_NOT_LINKED)
all_notlinked = FALSE;
if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
|| stream->last_ret == GST_FLOW_FLUSHING)
return stream->last_ret;
}
if (all_notlinked)
return GST_FLOW_NOT_LINKED;
return GST_FLOW_OK;
}
static void static void
gst_mss_demux_stream_loop (GstMssDemux * mssdemux) gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
{ {
@ -1289,6 +1459,8 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret, GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
gst_flow_get_name (ret)); gst_flow_get_name (ret));
/* Lock as this may change the tasks state */
GST_OBJECT_LOCK (mssdemux);
switch (ret) { switch (ret) {
case GST_FLOW_OK: case GST_FLOW_OK:
break; break;
@ -1302,6 +1474,7 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
default: default:
g_assert_not_reached (); g_assert_not_reached ();
} }
GST_OBJECT_UNLOCK (mssdemux);
GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s", GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
stream, GST_PAD_NAME (stream->pad)); stream, GST_PAD_NAME (stream->pad));
@ -1332,17 +1505,20 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
GST_DEBUG_OBJECT (mssdemux, GST_DEBUG_OBJECT (mssdemux,
"Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
" discont:%d on pad %s", object, " discont:%d on pad %s:%s", object,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
GST_TIME_ARGS (GST_BUFFER_DURATION (object)), GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT), GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
GST_PAD_NAME (stream->pad)); GST_DEBUG_PAD_NAME (stream->pad));
stream->next_timestamp = stream->next_timestamp =
GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object); GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
stream->have_data = TRUE; stream->have_data = TRUE;
ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object)); mssdemux->segment.position = GST_BUFFER_TIMESTAMP (object);
stream->last_ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
GST_DEBUG_OBJECT (mssdemux, "Pushed on pad %s:%s result: %d (%s)",
GST_DEBUG_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
} else if (GST_IS_EVENT (object)) { } else if (GST_IS_EVENT (object)) {
if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) { if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) {
stream->eos = TRUE; stream->eos = TRUE;
@ -1350,21 +1526,27 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object, GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object,
GST_PAD_NAME (stream->pad)); GST_PAD_NAME (stream->pad));
gst_pad_push_event (stream->pad, GST_EVENT_CAST (object)); gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
stream->last_ret = GST_FLOW_EOS;
} else { } else {
g_return_if_reached (); g_return_if_reached ();
} }
/* Lock as this may change the tasks state */
GST_OBJECT_LOCK (mssdemux);
ret = gst_mss_demux_combine_flows (mssdemux);
switch (ret) { switch (ret) {
case GST_FLOW_EOS: case GST_FLOW_EOS:
goto eos; /* EOS ? */ goto eos; /* EOS ? */
case GST_FLOW_ERROR: case GST_FLOW_ERROR:
goto error; goto error;
case GST_FLOW_NOT_LINKED: case GST_FLOW_NOT_LINKED:
break; /* TODO what to do here? pause the task or just keep pushing? */ /* stream won't download any more data until it gets a reconfigure */
break;
case GST_FLOW_OK: case GST_FLOW_OK:
default: default:
break; break;
} }
GST_OBJECT_UNLOCK (mssdemux);
GST_LOG_OBJECT (mssdemux, "Stream loop end"); GST_LOG_OBJECT (mssdemux, "Stream loop end");
return; return;
@ -1373,18 +1555,21 @@ eos:
{ {
GST_DEBUG_OBJECT (mssdemux, "EOS on all pads"); GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
gst_task_pause (mssdemux->stream_task); gst_task_pause (mssdemux->stream_task);
GST_OBJECT_UNLOCK (mssdemux);
return; return;
} }
error: error:
{ {
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment"); GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
gst_task_pause (mssdemux->stream_task); gst_task_pause (mssdemux->stream_task);
GST_OBJECT_UNLOCK (mssdemux);
return; return;
} }
stop: stop:
{ {
GST_DEBUG_OBJECT (mssdemux, "Pausing streaming task"); GST_DEBUG_OBJECT (mssdemux, "Pausing streaming task");
gst_task_pause (mssdemux->stream_task); gst_task_pause (mssdemux->stream_task);
GST_OBJECT_UNLOCK (mssdemux);
return; return;
} }
} }

View file

@ -52,9 +52,14 @@ typedef struct _GstMssDemuxStream GstMssDemuxStream;
typedef struct _GstMssDemux GstMssDemux; typedef struct _GstMssDemux GstMssDemux;
typedef struct _GstMssDemuxClass GstMssDemuxClass; typedef struct _GstMssDemuxClass GstMssDemuxClass;
#define GST_MSS_DEMUX_STREAM_LOCK(s) g_mutex_lock (&(s)->mutex)
#define GST_MSS_DEMUX_STREAM_UNLOCK(s) g_mutex_unlock (&(s)->mutex)
struct _GstMssDemuxStream { struct _GstMssDemuxStream {
GstPad *pad; GstPad *pad;
GMutex mutex;
GstCaps *caps; GstCaps *caps;
GstMssDemux *parent; GstMssDemux *parent;
@ -72,9 +77,11 @@ struct _GstMssDemuxStream {
GstTask *download_task; GstTask *download_task;
GRecMutex download_lock; GRecMutex download_lock;
GstFlowReturn last_ret;
gboolean eos; gboolean eos;
gboolean have_data; gboolean have_data;
gboolean cancelled; gboolean cancelled;
gboolean restart_download;
GstDownloadRate download_rate; GstDownloadRate download_rate;