From 2ce4f6a8e4c8aa33d08d9f5fe28a83779fadf03f Mon Sep 17 00:00:00 2001 From: Thiago Santos Date: Tue, 12 Nov 2013 09:58:31 -0300 Subject: [PATCH] mssdemux: avoid downloading not-linked streams When a stream gets a not-linked return, it will be marked as so and won't download any more new fragments until a reconfigure event is received. This will make mssdemux expose all pads, but only download fragments for the streams that are actually being used. Relying on the pads being linked/unlinked isn't enough in this scenario as there might be an input-selector downstream that is actually discarding buffers for a given linked pad. When streams are switching, the old active stream can be blocked because input-selector will block not-linked streams. In case the mssdemux's stream loop is blocked pushing a buffer to a full queue downstream it will never unblock as the queue will not drain (input-selector is blocking). In this scenario, stream switching will deadlock as input-selector is waiting for the newly active stream data and the stream_loop that would push this data is blocked waiting for input-selector. To solve this issue, whenever an stream is reactivated on a reconfigure it will enter into the 'catch up mode', in this mode it can push buffers from its download thread until it reaches the currrent GstSegment's position. This works because this timestamp will always be behind or equal to the maximum timestamp pushed for all streams, after pushing data for this timestamp, the stream will go back to default and be pushed sequentially from the main streaming thread. By this time, the input-selector should have already released the thread. https://bugzilla.gnome.org/show_bug.cgi?id=711849 --- ext/smoothstreaming/gstmssdemux.c | 239 ++++++++++++++++++++++++++---- ext/smoothstreaming/gstmssdemux.h | 7 + 2 files changed, 219 insertions(+), 27 deletions(-) diff --git a/ext/smoothstreaming/gstmssdemux.c b/ext/smoothstreaming/gstmssdemux.c index 368863969d..fc7fa7188f 100644 --- a/ext/smoothstreaming/gstmssdemux.c +++ b/ext/smoothstreaming/gstmssdemux.c @@ -234,6 +234,7 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux, stream->downloader = gst_uri_downloader_new (); stream->dataqueue = gst_data_queue_new (_data_queue_check_full, NULL, NULL, stream); + g_mutex_init (&stream->mutex); /* Downloading task */ g_rec_mutex_init (&stream->download_lock); @@ -292,6 +293,7 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream) } if (stream->caps) gst_caps_unref (stream->caps); + g_mutex_clear (&stream->mutex); g_free (stream); } @@ -619,6 +621,9 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) GstMssDemuxStream *stream = iter->data; stream->eos = FALSE; + if (flags & GST_SEEK_FLAG_FLUSH) { + stream->last_ret = GST_FLOW_OK; + } gst_data_queue_flush (stream->dataqueue); gst_event_replace (&stream->pending_newsegment, newsegment); } @@ -637,6 +642,26 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) gst_event_unref (event); return TRUE; } + case GST_EVENT_RECONFIGURE:{ + GSList *iter; + + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { + GstMssDemuxStream *stream = iter->data; + + if (stream->pad == pad) { + GST_MSS_DEMUX_STREAM_LOCK (stream); + if (GST_TASK_STATE (stream->download_task) == GST_TASK_PAUSED + && stream->last_ret == GST_FLOW_NOT_LINKED) { + stream->restart_download = TRUE; + gst_task_start (stream->download_task); + } + GST_MSS_DEMUX_STREAM_UNLOCK (stream); + gst_event_unref (event); + return TRUE; + } + } + } + break; default: break; } @@ -1051,16 +1076,23 @@ gst_mss_demux_stream_store_object (GstMssDemuxStream * stream, static GstFlowReturn gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, - gboolean * buffer_downloaded) + GstBuffer ** _buffer) { GstMssDemux *mssdemux = stream->parent; gchar *path; gchar *url; GstFragment *fragment; - GstBuffer *_buffer; + GstBuffer *buffer; GstFlowReturn ret = GST_FLOW_OK; guint64 before_download, after_download; + /* special case for not-linked streams */ + if (stream->last_ret == GST_FLOW_NOT_LINKED) { + GST_DEBUG_OBJECT (mssdemux, "Skipping download for not-linked stream %p", + stream); + return GST_FLOW_NOT_LINKED; + } + before_download = g_get_real_time (); GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream); @@ -1103,22 +1135,19 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, return GST_FLOW_ERROR; } - _buffer = gst_fragment_get_buffer (fragment); - _buffer = gst_buffer_make_writable (_buffer); - GST_BUFFER_TIMESTAMP (_buffer) = + buffer = gst_fragment_get_buffer (fragment); + *_buffer = buffer = gst_buffer_make_writable (buffer); + GST_BUFFER_TIMESTAMP (buffer) = gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream); - GST_BUFFER_DURATION (_buffer) = + GST_BUFFER_DURATION (buffer) = gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream); g_object_unref (fragment); - if (buffer_downloaded) - *buffer_downloaded = _buffer != NULL; - after_download = g_get_real_time (); if (_buffer) { #ifndef GST_DISABLE_GST_DEBUG - guint64 bitrate = (8 * gst_buffer_get_size (_buffer) * 1000000LLU) / + guint64 bitrate = (8 * gst_buffer_get_size (buffer) * 1000000LLU) / (after_download - before_download); #endif @@ -1126,16 +1155,8 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, "Measured download bitrate: %s %" G_GUINT64_FORMAT " bps", GST_PAD_NAME (stream->pad), bitrate); gst_download_rate_add_rate (&stream->download_rate, - gst_buffer_get_size (_buffer), + gst_buffer_get_size (buffer), 1000 * (after_download - before_download)); - - GST_DEBUG_OBJECT (mssdemux, - "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT - " Duration: %" GST_TIME_FORMAT, - stream, GST_PAD_NAME (stream->pad), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)), - GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer))); - gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer)); } return ret; @@ -1160,22 +1181,130 @@ static void gst_mss_demux_download_loop (GstMssDemuxStream * stream) { GstMssDemux *mssdemux = stream->parent; - gboolean buffer_downloaded = FALSE; GstFlowReturn ret; + GstBuffer *buffer = NULL; + gboolean buffer_downloaded = FALSE; + GstEvent *gap = NULL; GST_LOG_OBJECT (mssdemux, "download loop start %p", stream); GST_OBJECT_LOCK (mssdemux); + if (G_UNLIKELY (stream->restart_download)) { + GstClockTime cur, ts; + gint64 pos; + + GST_MSS_DEMUX_STREAM_LOCK (stream); + + GST_DEBUG_OBJECT (mssdemux, + "Activating stream %p due to reconfigure " "event", stream); + + cur = GST_CLOCK_TIME_IS_VALID (stream->next_timestamp) ? + stream->next_timestamp : 0; + + if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) { + ts = (GstClockTime) pos; + GST_DEBUG_OBJECT (mssdemux, "Downstream position: %" + GST_TIME_FORMAT, GST_TIME_ARGS (ts)); + } else { + GST_DEBUG_OBJECT (mssdemux, "Downstream position query failed, " + "failling back to segment position"); + ts = mssdemux->segment.position; + } + + /* we might have already pushed this data */ + ts = MAX (ts, stream->next_timestamp); + + GST_DEBUG_OBJECT (mssdemux, "Restarting stream %p %s:%s at " + "position %" GST_TIME_FORMAT ", catching up until segment position %" + GST_TIME_FORMAT, stream, GST_DEBUG_PAD_NAME (stream->pad), + GST_TIME_ARGS (ts), GST_TIME_ARGS (mssdemux->segment.position)); + + if (GST_CLOCK_TIME_IS_VALID (ts)) { + gst_mss_stream_seek (stream->manifest_stream, ts); + + if (cur < ts) { + gap = gst_event_new_gap (cur, ts - cur); + } + } + + /* This stream might be entering into catching up mode, + * meaning that it will push buffers from this same download thread + * until it reaches the segment position. + * + * The reason for this is that in case of stream switching, the other + * stream that was previously active might be blocking the stream_loop + * in case it is ahead enough that all queues are filled. + * In this case, it is possible that a downstream input-selector is + * blocking waiting for the currently active stream to reach the + * same position of the old linked stream because of the 'sync-streams' + * behavior. + * + * We can push from this thread up to segment position as all other + * streams should be around the same timestamp. + */ + stream->last_ret = GST_FLOW_CUSTOM_SUCCESS; + stream->eos = FALSE; + + gst_data_queue_set_flushing (stream->dataqueue, FALSE); + stream->restart_download = FALSE; + gst_task_start (mssdemux->stream_task); + GST_MSS_DEMUX_STREAM_UNLOCK (stream); + } + GST_DEBUG_OBJECT (mssdemux, "Starting streams reconfiguration due to bitrate changes"); gst_mss_demux_reconfigure_stream (stream); GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration"); GST_OBJECT_UNLOCK (mssdemux); - ret = gst_mss_demux_stream_download_fragment (stream, &buffer_downloaded); + if (gap != NULL) + gst_pad_push_event (stream->pad, gap); - if (stream->cancelled) + ret = gst_mss_demux_stream_download_fragment (stream, &buffer); + buffer_downloaded = buffer != NULL; + + if (stream->cancelled) { + if (buffer) + gst_buffer_unref (buffer); goto cancelled; + } + + if (buffer) { + gboolean catch_up = FALSE; + + /* Check if this stream is on catch up mode */ + if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) { + GST_DEBUG_OBJECT (mssdemux, + "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT, + GST_TIME_ARGS (mssdemux->segment.position), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer))); + if (GST_BUFFER_TIMESTAMP (buffer) < mssdemux->segment.position) { + catch_up = TRUE; + } else { + GST_OBJECT_LOCK (mssdemux); + stream->last_ret = GST_FLOW_OK; + gst_task_start (mssdemux->stream_task); + GST_OBJECT_UNLOCK (mssdemux); + } + } + + GST_DEBUG_OBJECT (mssdemux, + "%s buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT + " Duration: %" GST_TIME_FORMAT, + catch_up ? "Catch up push for" : "Storing", stream, + GST_PAD_NAME (stream->pad), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), + GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); + + if (catch_up) { + ret = stream->last_ret = gst_pad_push (stream->pad, buffer); + if (G_LIKELY (ret == GST_FLOW_OK)) + stream->last_ret = GST_FLOW_CUSTOM_SUCCESS; + /* TODO handle return */ + } else { + gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (buffer)); + } + } switch (ret) { case GST_FLOW_OK: @@ -1184,6 +1313,8 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream) goto eos; case GST_FLOW_ERROR: goto error; + case GST_FLOW_NOT_LINKED: + goto notlinked; default: break; } @@ -1203,7 +1334,10 @@ eos: GST_DEBUG_PAD_NAME (stream->pad)); gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (gst_event_new_eos ())); + GST_OBJECT_LOCK (mssdemux); gst_task_pause (stream->download_task); + gst_task_start (mssdemux->stream_task); + GST_OBJECT_UNLOCK (mssdemux); return; } error: @@ -1222,6 +1356,15 @@ cancelled: gst_task_pause (stream->download_task); return; } +notlinked: + { + GST_MSS_DEMUX_STREAM_LOCK (stream); + if (stream->last_ret == GST_FLOW_NOT_LINKED) { + gst_task_pause (stream->download_task); + gst_data_queue_set_flushing (stream->dataqueue, TRUE); + } + GST_MSS_DEMUX_STREAM_UNLOCK (stream); + } } static GstFlowReturn @@ -1242,12 +1385,18 @@ gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux, GstDataQueueItem *item; other = iter->data; - if (other->eos) { + if (other->eos || other->last_ret == GST_FLOW_NOT_LINKED) { + GST_DEBUG_OBJECT (mssdemux, "Skipping stream %p eos:%d last-ret:%d", + other, other->eos, other->last_ret); continue; } if (!gst_data_queue_peek (other->dataqueue, &item)) { /* flushing */ + if (other->last_ret == GST_FLOW_NOT_LINKED) { + /* might have been unlinked and won't receive data for now */ + continue; + } return GST_FLOW_FLUSHING; } @@ -1269,6 +1418,27 @@ gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux, return ret; } +static GstFlowReturn +gst_mss_demux_combine_flows (GstMssDemux * mssdemux) +{ + gboolean all_notlinked = TRUE; + GSList *iter; + + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { + GstMssDemuxStream *stream = iter->data; + + if (stream->last_ret != GST_FLOW_NOT_LINKED) + all_notlinked = FALSE; + + if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED + || stream->last_ret == GST_FLOW_FLUSHING) + return stream->last_ret; + } + if (all_notlinked) + return GST_FLOW_NOT_LINKED; + return GST_FLOW_OK; +} + static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux) { @@ -1289,6 +1459,8 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux) GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret, gst_flow_get_name (ret)); + /* Lock as this may change the tasks state */ + GST_OBJECT_LOCK (mssdemux); switch (ret) { case GST_FLOW_OK: break; @@ -1302,6 +1474,7 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux) default: g_assert_not_reached (); } + GST_OBJECT_UNLOCK (mssdemux); GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s", stream, GST_PAD_NAME (stream->pad)); @@ -1332,17 +1505,20 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux) GST_DEBUG_OBJECT (mssdemux, "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT - " discont:%d on pad %s", object, + " discont:%d on pad %s:%s", object, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)), GST_TIME_ARGS (GST_BUFFER_DURATION (object)), GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT), - GST_PAD_NAME (stream->pad)); + GST_DEBUG_PAD_NAME (stream->pad)); stream->next_timestamp = GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object); stream->have_data = TRUE; - ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object)); + mssdemux->segment.position = GST_BUFFER_TIMESTAMP (object); + stream->last_ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object)); + GST_DEBUG_OBJECT (mssdemux, "Pushed on pad %s:%s result: %d (%s)", + GST_DEBUG_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret)); } else if (GST_IS_EVENT (object)) { if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) { stream->eos = TRUE; @@ -1350,21 +1526,27 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux) GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object, GST_PAD_NAME (stream->pad)); gst_pad_push_event (stream->pad, GST_EVENT_CAST (object)); + stream->last_ret = GST_FLOW_EOS; } else { g_return_if_reached (); } + /* Lock as this may change the tasks state */ + GST_OBJECT_LOCK (mssdemux); + ret = gst_mss_demux_combine_flows (mssdemux); switch (ret) { case GST_FLOW_EOS: goto eos; /* EOS ? */ case GST_FLOW_ERROR: goto error; case GST_FLOW_NOT_LINKED: - break; /* TODO what to do here? pause the task or just keep pushing? */ + /* stream won't download any more data until it gets a reconfigure */ + break; case GST_FLOW_OK: default: break; } + GST_OBJECT_UNLOCK (mssdemux); GST_LOG_OBJECT (mssdemux, "Stream loop end"); return; @@ -1373,18 +1555,21 @@ eos: { GST_DEBUG_OBJECT (mssdemux, "EOS on all pads"); gst_task_pause (mssdemux->stream_task); + GST_OBJECT_UNLOCK (mssdemux); return; } error: { GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment"); gst_task_pause (mssdemux->stream_task); + GST_OBJECT_UNLOCK (mssdemux); return; } stop: { GST_DEBUG_OBJECT (mssdemux, "Pausing streaming task"); gst_task_pause (mssdemux->stream_task); + GST_OBJECT_UNLOCK (mssdemux); return; } } diff --git a/ext/smoothstreaming/gstmssdemux.h b/ext/smoothstreaming/gstmssdemux.h index 6bc3ba4653..d651b8050f 100644 --- a/ext/smoothstreaming/gstmssdemux.h +++ b/ext/smoothstreaming/gstmssdemux.h @@ -52,9 +52,14 @@ typedef struct _GstMssDemuxStream GstMssDemuxStream; typedef struct _GstMssDemux GstMssDemux; typedef struct _GstMssDemuxClass GstMssDemuxClass; +#define GST_MSS_DEMUX_STREAM_LOCK(s) g_mutex_lock (&(s)->mutex) +#define GST_MSS_DEMUX_STREAM_UNLOCK(s) g_mutex_unlock (&(s)->mutex) + struct _GstMssDemuxStream { GstPad *pad; + GMutex mutex; + GstCaps *caps; GstMssDemux *parent; @@ -72,9 +77,11 @@ struct _GstMssDemuxStream { GstTask *download_task; GRecMutex download_lock; + GstFlowReturn last_ret; gboolean eos; gboolean have_data; gboolean cancelled; + gboolean restart_download; GstDownloadRate download_rate;