diff --git a/ext/smoothstreaming/gstmssdemux.c b/ext/smoothstreaming/gstmssdemux.c index 8e4e263feb..a596726c67 100644 --- a/ext/smoothstreaming/gstmssdemux.c +++ b/ext/smoothstreaming/gstmssdemux.c @@ -85,7 +85,8 @@ static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event); static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query); -static void gst_mss_demux_stream_loop (GstMssDemuxStream * stream); +static void gst_mss_demux_download_loop (GstMssDemuxStream * stream); +static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux); static void gst_mss_demux_process_manifest (GstMssDemux * mssdemux); @@ -143,6 +144,23 @@ gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass) gst_pad_set_event_function (mssdemux->sinkpad, GST_DEBUG_FUNCPTR (gst_mss_demux_event)); gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad); + + g_static_rec_mutex_init (&mssdemux->stream_lock); + mssdemux->stream_task = + gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux); + gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock); +} + +static gboolean +_data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes, + guint64 time, gpointer checkdata) +{ + GstMssDemuxStream *stream = checkdata; + GstMssDemux *mssdemux = stream->parent; + + if (mssdemux->data_queue_max_size == 0) + return FALSE; /* never full */ + return visible >= mssdemux->data_queue_max_size; } static GstMssDemuxStream * @@ -153,12 +171,13 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux, stream = g_new0 (GstMssDemuxStream, 1); stream->downloader = gst_uri_downloader_new (); + stream->dataqueue = gst_data_queue_new (_data_queue_check_full, stream); - /* Streaming task */ - g_static_rec_mutex_init (&stream->stream_lock); - stream->stream_task = - gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, stream); - gst_task_set_lock (stream->stream_task, &stream->stream_lock); + /* Downloading task */ + g_static_rec_mutex_init (&stream->download_lock); + stream->download_task = + gst_task_create ((GstTaskFunction) gst_mss_demux_download_loop, stream); + gst_task_set_lock (stream->download_task, &stream->download_lock); stream->pad = srcpad; stream->manifest_stream = manifeststream; @@ -170,20 +189,20 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux, static void gst_mss_demux_stream_free (GstMssDemuxStream * stream) { - if (stream->stream_task) { - if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED) { + if (stream->download_task) { + if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) { GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s", GST_DEBUG_PAD_NAME (stream->pad)); - gst_task_stop (stream->stream_task); - g_static_rec_mutex_lock (&stream->stream_lock); - g_static_rec_mutex_unlock (&stream->stream_lock); + gst_task_stop (stream->download_task); + g_static_rec_mutex_lock (&stream->download_lock); + g_static_rec_mutex_unlock (&stream->download_lock); GST_LOG_OBJECT (stream->parent, "Waiting for task to finish"); - gst_task_join (stream->stream_task); + gst_task_join (stream->download_task); GST_LOG_OBJECT (stream->parent, "Finished"); } - gst_object_unref (stream->stream_task); - g_static_rec_mutex_free (&stream->stream_lock); - stream->stream_task = NULL; + gst_object_unref (stream->download_task); + g_static_rec_mutex_free (&stream->download_lock); + stream->download_task = NULL; } if (stream->pending_newsegment) { @@ -196,6 +215,10 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream) g_object_unref (stream->downloader); stream->downloader = NULL; } + if (stream->dataqueue) { + g_object_unref (stream->dataqueue); + stream->dataqueue = NULL; + } if (stream->pad) { gst_object_unref (stream->pad); stream->pad = NULL; @@ -207,6 +230,14 @@ static void gst_mss_demux_reset (GstMssDemux * mssdemux) { GSList *iter; + + if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) { + gst_task_stop (mssdemux->stream_task); + g_static_rec_mutex_lock (&mssdemux->stream_lock); + g_static_rec_mutex_unlock (&mssdemux->stream_lock); + gst_task_join (mssdemux->stream_task); + } + if (mssdemux->manifest_buffer) { gst_buffer_unref (mssdemux->manifest_buffer); mssdemux->manifest_buffer = NULL; @@ -233,7 +264,13 @@ gst_mss_demux_reset (GstMssDemux * mssdemux) static void gst_mss_demux_dispose (GObject * object) { - /* GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object); */ + GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object); + + if (mssdemux->stream_task) { + gst_object_unref (mssdemux->stream_task); + g_static_rec_mutex_free (&mssdemux->stream_lock); + mssdemux->stream_task = NULL; + } G_OBJECT_CLASS (parent_class)->dispose (object); } @@ -325,8 +362,10 @@ gst_mss_demux_start (GstMssDemux * mssdemux) GST_INFO_OBJECT (mssdemux, "Starting streams' tasks"); for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { GstMssDemuxStream *stream = iter->data; - gst_task_start (stream->stream_task); + gst_task_start (stream->download_task); } + + gst_task_start (mssdemux->stream_task); } static gboolean @@ -378,17 +417,23 @@ static void gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate) { GSList *iter; + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { GstMssDemuxStream *stream = iter->data; + gst_data_queue_set_flushing (stream->dataqueue, TRUE); + if (immediate) gst_uri_downloader_cancel (stream->downloader); - gst_task_pause (stream->stream_task); + gst_task_pause (stream->download_task); } + gst_task_pause (mssdemux->stream_task); + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { GstMssDemuxStream *stream = iter->data; - g_static_rec_mutex_lock (&stream->stream_lock); + g_static_rec_mutex_lock (&stream->download_lock); } + g_static_rec_mutex_lock (&mssdemux->stream_lock); } static void @@ -397,13 +442,16 @@ gst_mss_demux_restart_tasks (GstMssDemux * mssdemux) GSList *iter; for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { GstMssDemuxStream *stream = iter->data; - g_static_rec_mutex_unlock (&stream->stream_lock); + g_static_rec_mutex_unlock (&stream->download_lock); } + g_static_rec_mutex_unlock (&mssdemux->stream_lock); for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { GstMssDemuxStream *stream = iter->data; - gst_task_start (stream->stream_task); + gst_data_queue_set_flushing (stream->dataqueue, FALSE); + gst_task_start (stream->download_task); } + gst_task_start (mssdemux->stream_task); } static gboolean @@ -458,6 +506,8 @@ gst_mss_demux_src_event (GstPad * pad, GstEvent * event) for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { GstMssDemuxStream *stream = iter->data; + stream->eos = FALSE; + gst_data_queue_flush (stream->dataqueue); stream->pending_newsegment = gst_event_ref (newsegment); } gst_event_unref (newsegment); @@ -727,7 +777,7 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux) GSList *oldpads = NULL; GSList *iter; - gst_mss_demux_stop_tasks (mssdemux, FALSE); + gst_mss_demux_stop_tasks (mssdemux, TRUE); if (gst_mss_manifest_change_bitrate (mssdemux->manifest, mssdemux->connection_speed)) { @@ -736,15 +786,46 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux) for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { GstMssDemuxStream *stream = iter->data; GstPad *oldpad = stream->pad; - GstClockTime ts = - gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream); + GstClockTime ts = GST_CLOCK_TIME_NONE; oldpads = g_slist_prepend (oldpads, oldpad); + /* since we are flushing the queue, get the next un-pushed timestamp to seek + * and avoid gaps */ + gst_data_queue_set_flushing (stream->dataqueue, FALSE); + if (!gst_data_queue_is_empty (stream->dataqueue)) { + GstDataQueueItem *item = NULL; + + while (!gst_data_queue_is_empty (stream->dataqueue) + && !GST_CLOCK_TIME_IS_VALID (ts)) { + gst_data_queue_pop (stream->dataqueue, &item); + + if (!item) { + g_assert_not_reached (); + break; + } + + if (GST_IS_BUFFER (item->object)) { + GstBuffer *buffer = GST_BUFFER_CAST (item->object); + + ts = GST_BUFFER_TIMESTAMP (buffer); + } + item->destroy (item); + } + + } + if (!GST_CLOCK_TIME_IS_VALID (ts)) { + ts = gst_mss_stream_get_fragment_gst_timestamp + (stream->manifest_stream); + } + + GST_DEBUG_OBJECT (mssdemux, + "Seeking stream %p %s to ts %" GST_TIME_FORMAT, stream, + GST_PAD_NAME (stream->pad), GST_TIME_ARGS (ts)); + gst_mss_stream_seek (stream->manifest_stream, ts); + gst_data_queue_flush (stream->dataqueue); + stream->pad = _create_pad (mssdemux, stream->manifest_stream); - /* TODO keep the same playback rate */ - stream->pending_newsegment = - gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, ts, -1, ts); gst_mss_demux_expose_stream (mssdemux, stream); gst_pad_push_event (oldpad, gst_event_new_eos ()); @@ -763,6 +844,37 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux) gst_mss_demux_restart_tasks (mssdemux); } +static void +_free_data_queue_item (gpointer obj) +{ + GstDataQueueItem *item = obj; + + gst_mini_object_unref (item->object); + g_slice_free (GstDataQueueItem, item); +} + +static void +gst_mss_demux_stream_store_object (GstMssDemuxStream * stream, + GstMiniObject * obj) +{ + GstDataQueueItem *item; + + item = g_slice_new (GstDataQueueItem); + item->object = (GstMiniObject *) obj; + + item->duration = 0; /* we don't care */ + item->size = 0; + item->visible = TRUE; + + item->destroy = (GDestroyNotify) _free_data_queue_item; + + if (!gst_data_queue_push (stream->dataqueue, item)) { + GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj); + gst_mini_object_unref (obj); + g_slice_free (GstDataQueueItem, item); + } +} + static GstFlowReturn gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, GstBuffer ** buffer) @@ -811,7 +923,17 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, GST_BUFFER_DURATION (_buffer) = gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream); - *buffer = _buffer; + if (buffer) + *buffer = _buffer; + + if (_buffer) { + GST_DEBUG_OBJECT (mssdemux, + "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT, + stream, GST_PAD_NAME (stream->pad), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer))); + gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer)); + } + return ret; no_url_error: @@ -819,39 +941,26 @@ no_url_error: GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX, (_("Failed to get fragment URL.")), ("An error happened when getting fragment URL")); - gst_task_stop (stream->stream_task); + gst_task_stop (stream->download_task); return GST_FLOW_ERROR; } error: { GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment"); - gst_task_stop (stream->stream_task); + gst_task_stop (stream->download_task); return GST_FLOW_ERROR; } } static void -gst_mss_demux_stream_loop (GstMssDemuxStream * stream) +gst_mss_demux_download_loop (GstMssDemuxStream * stream) { GstMssDemux *mssdemux = stream->parent; GstBuffer *buffer = NULL; GstFlowReturn ret; - GST_OBJECT_LOCK (mssdemux); - if (mssdemux->update_bitrates) { - mssdemux->update_bitrates = FALSE; - GST_OBJECT_UNLOCK (mssdemux); + GST_LOG_OBJECT (mssdemux, "download loop start %p", stream); - GST_DEBUG_OBJECT (mssdemux, - "Starting streams reconfiguration due to bitrate changes"); - g_thread_create ((GThreadFunc) gst_mss_demux_reconfigure, mssdemux, FALSE, - NULL); - GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration"); - gst_task_stop (stream->stream_task); - return; - } else { - GST_OBJECT_UNLOCK (mssdemux); - } ret = gst_mss_demux_stream_download_fragment (stream, &buffer); switch (ret) { @@ -867,14 +976,154 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream) g_assert (buffer != NULL); + gst_mss_stream_advance_fragment (stream->manifest_stream); + GST_LOG_OBJECT (mssdemux, "download loop end %p", stream); + return; + +eos: + { + GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s", + GST_DEBUG_PAD_NAME (stream->pad)); + gst_mss_demux_stream_store_object (stream, + GST_MINI_OBJECT_CAST (gst_event_new_eos ())); + gst_task_stop (stream->download_task); + return; + } +error: + { + GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment"); + gst_task_stop (stream->download_task); + return; + } +} + +static GstFlowReturn +gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux, + GstMssDemuxStream ** stream) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstMssDemuxStream *current = NULL; + GstClockTime cur_time = GST_CLOCK_TIME_NONE; + GSList *iter; + + if (!mssdemux->streams) + return GST_FLOW_ERROR; + + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { + GstClockTime time; + GstMssDemuxStream *other; + GstDataQueueItem *item; + + other = iter->data; + if (other->eos) { + continue; + } + + if (gst_data_queue_peek (other->dataqueue, &item)) { + } else { + /* flushing */ + return GST_FLOW_WRONG_STATE; + } + + if (GST_IS_EVENT (item->object)) { + /* events have higher priority */ + current = other; + break; + } + time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object)); + if (time < cur_time) { + cur_time = time; + current = other; + } + } + + *stream = current; + if (current == NULL) + ret = GST_FLOW_UNEXPECTED; + return ret; +} + +static void +gst_mss_demux_stream_loop (GstMssDemux * mssdemux) +{ + GstMssDemuxStream *stream = NULL; + GstFlowReturn ret; + GstMiniObject *object = NULL; + GstDataQueueItem *item = NULL; + + GST_LOG_OBJECT (mssdemux, "Starting stream loop"); + + GST_OBJECT_LOCK (mssdemux); + if (mssdemux->update_bitrates) { + mssdemux->update_bitrates = FALSE; + GST_OBJECT_UNLOCK (mssdemux); + + GST_DEBUG_OBJECT (mssdemux, + "Starting streams reconfiguration due to bitrate changes"); + gst_mss_demux_reconfigure (mssdemux); + GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration"); + } else { + GST_OBJECT_UNLOCK (mssdemux); + } + + ret = gst_mss_demux_select_latest_stream (mssdemux, &stream); + + if (stream) + GST_DEBUG_OBJECT (mssdemux, + "Stream loop selected %p stream of pad %s. %d - %s", stream, + GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret)); + else + GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret, + gst_flow_get_name (ret)); + + switch (ret) { + case GST_FLOW_OK: + break; + case GST_FLOW_ERROR: + goto error; + case GST_FLOW_UNEXPECTED: + goto eos; + case GST_FLOW_WRONG_STATE: + GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task"); + goto stop; + default: + g_assert_not_reached (); + } + + GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s", + stream, GST_PAD_NAME (stream->pad)); + if (gst_data_queue_pop (stream->dataqueue, &item)) { + if (item->object) + object = gst_mini_object_ref (item->object); + item->destroy (item); + } else { + GST_DEBUG_OBJECT (mssdemux, + "Failed to get object from dataqueue on stream %p %s", stream, + GST_PAD_NAME (stream->pad)); + goto stop; + } + if (G_UNLIKELY (stream->pending_newsegment)) { gst_pad_push_event (stream->pad, stream->pending_newsegment); stream->pending_newsegment = NULL; } - GST_DEBUG_OBJECT (mssdemux, "Pushing buffer of size %u on pad %s", - GST_BUFFER_SIZE (buffer), GST_PAD_NAME (stream->pad)); - ret = gst_pad_push (stream->pad, buffer); + if (G_LIKELY (GST_IS_BUFFER (object))) { + GST_DEBUG_OBJECT (mssdemux, + "Pushing buffer %p %" GST_TIME_FORMAT " on pad %s", object, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)), + GST_PAD_NAME (stream->pad)); + ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object)); + } else if (GST_IS_EVENT (object)) { + if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) + stream->eos = TRUE; + GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object, + GST_PAD_NAME (stream->pad)); + gst_pad_push_event (stream->pad, GST_EVENT_CAST (object)); + } else { + g_return_if_reached (); + } + switch (ret) { case GST_FLOW_UNEXPECTED: goto eos; /* EOS ? */ @@ -887,22 +1136,25 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream) break; } - gst_mss_stream_advance_fragment (stream->manifest_stream); + GST_LOG_OBJECT (mssdemux, "Stream loop end"); return; eos: { - GstEvent *eos = gst_event_new_eos (); - GST_DEBUG_OBJECT (mssdemux, "Pushing EOS on pad %s:%s", - GST_DEBUG_PAD_NAME (stream->pad)); - gst_pad_push_event (stream->pad, eos); - gst_task_stop (stream->stream_task); + GST_DEBUG_OBJECT (mssdemux, "EOS on all pads"); + gst_task_stop (mssdemux->stream_task); return; } error: { GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment"); - gst_task_stop (stream->stream_task); + gst_task_stop (mssdemux->stream_task); + return; + } +stop: + { + GST_DEBUG_OBJECT (mssdemux, "Stopping streaming task"); + gst_task_stop (mssdemux->stream_task); return; } } diff --git a/ext/smoothstreaming/gstmssdemux.h b/ext/smoothstreaming/gstmssdemux.h index c279cc2b7e..ceb471bdaf 100644 --- a/ext/smoothstreaming/gstmssdemux.h +++ b/ext/smoothstreaming/gstmssdemux.h @@ -25,6 +25,7 @@ #include #include +#include #include "gstmssmanifest.h" #include "gsturidownloader.h" @@ -58,13 +59,15 @@ struct _GstMssDemuxStream { GstMssStream *manifest_stream; GstUriDownloader *downloader; + GstDataQueue *dataqueue; GstEvent *pending_newsegment; - /* Streaming task */ - GstTask *stream_task; - GStaticRecMutex stream_lock; + /* Downloading task */ + GstTask *download_task; + GStaticRecMutex download_lock; + gboolean eos; }; struct _GstMssDemux { @@ -84,8 +87,13 @@ struct _GstMssDemux { gboolean update_bitrates; + /* Streaming task */ + GstTask *stream_task; + GStaticRecMutex stream_lock; + /* properties */ guint64 connection_speed; /* in bps */ + guint data_queue_max_size; }; struct _GstMssDemuxClass {