mssdemux: rewriting pad tasks so that buffers are pushed by ts order

Use pad tasks to download data and an extra task that gets the earlier
buffer (with the smallest timestamp) and pushes on the corresponding
pad.

This prevents that the audio stream rushes ahead on buffers as its
fragments should be smaller
This commit is contained in:
Thiago Santos 2013-01-14 13:21:10 -03:00
parent fba63178fe
commit c2ae981e6d
2 changed files with 318 additions and 58 deletions

View file

@ -85,7 +85,8 @@ static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
static void gst_mss_demux_stream_loop (GstMssDemuxStream * stream);
static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux);
static void gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
@ -143,6 +144,23 @@ gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
gst_pad_set_event_function (mssdemux->sinkpad,
GST_DEBUG_FUNCPTR (gst_mss_demux_event));
gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
g_static_rec_mutex_init (&mssdemux->stream_lock);
mssdemux->stream_task =
gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux);
gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock);
}
static gboolean
_data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes,
guint64 time, gpointer checkdata)
{
GstMssDemuxStream *stream = checkdata;
GstMssDemux *mssdemux = stream->parent;
if (mssdemux->data_queue_max_size == 0)
return FALSE; /* never full */
return visible >= mssdemux->data_queue_max_size;
}
static GstMssDemuxStream *
@ -153,12 +171,13 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
stream = g_new0 (GstMssDemuxStream, 1);
stream->downloader = gst_uri_downloader_new ();
stream->dataqueue = gst_data_queue_new (_data_queue_check_full, stream);
/* Streaming task */
g_static_rec_mutex_init (&stream->stream_lock);
stream->stream_task =
gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, stream);
gst_task_set_lock (stream->stream_task, &stream->stream_lock);
/* Downloading task */
g_static_rec_mutex_init (&stream->download_lock);
stream->download_task =
gst_task_create ((GstTaskFunction) gst_mss_demux_download_loop, stream);
gst_task_set_lock (stream->download_task, &stream->download_lock);
stream->pad = srcpad;
stream->manifest_stream = manifeststream;
@ -170,20 +189,20 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
static void
gst_mss_demux_stream_free (GstMssDemuxStream * stream)
{
if (stream->stream_task) {
if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED) {
if (stream->download_task) {
if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
GST_DEBUG_PAD_NAME (stream->pad));
gst_task_stop (stream->stream_task);
g_static_rec_mutex_lock (&stream->stream_lock);
g_static_rec_mutex_unlock (&stream->stream_lock);
gst_task_stop (stream->download_task);
g_static_rec_mutex_lock (&stream->download_lock);
g_static_rec_mutex_unlock (&stream->download_lock);
GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
gst_task_join (stream->stream_task);
gst_task_join (stream->download_task);
GST_LOG_OBJECT (stream->parent, "Finished");
}
gst_object_unref (stream->stream_task);
g_static_rec_mutex_free (&stream->stream_lock);
stream->stream_task = NULL;
gst_object_unref (stream->download_task);
g_static_rec_mutex_free (&stream->download_lock);
stream->download_task = NULL;
}
if (stream->pending_newsegment) {
@ -196,6 +215,10 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
g_object_unref (stream->downloader);
stream->downloader = NULL;
}
if (stream->dataqueue) {
g_object_unref (stream->dataqueue);
stream->dataqueue = NULL;
}
if (stream->pad) {
gst_object_unref (stream->pad);
stream->pad = NULL;
@ -207,6 +230,14 @@ static void
gst_mss_demux_reset (GstMssDemux * mssdemux)
{
GSList *iter;
if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) {
gst_task_stop (mssdemux->stream_task);
g_static_rec_mutex_lock (&mssdemux->stream_lock);
g_static_rec_mutex_unlock (&mssdemux->stream_lock);
gst_task_join (mssdemux->stream_task);
}
if (mssdemux->manifest_buffer) {
gst_buffer_unref (mssdemux->manifest_buffer);
mssdemux->manifest_buffer = NULL;
@ -233,7 +264,13 @@ gst_mss_demux_reset (GstMssDemux * mssdemux)
static void
gst_mss_demux_dispose (GObject * object)
{
/* GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object); */
GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object);
if (mssdemux->stream_task) {
gst_object_unref (mssdemux->stream_task);
g_static_rec_mutex_free (&mssdemux->stream_lock);
mssdemux->stream_task = NULL;
}
G_OBJECT_CLASS (parent_class)->dispose (object);
}
@ -325,8 +362,10 @@ gst_mss_demux_start (GstMssDemux * mssdemux)
GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_task_start (stream->stream_task);
gst_task_start (stream->download_task);
}
gst_task_start (mssdemux->stream_task);
}
static gboolean
@ -378,17 +417,23 @@ static void
gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
{
GSList *iter;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_data_queue_set_flushing (stream->dataqueue, TRUE);
if (immediate)
gst_uri_downloader_cancel (stream->downloader);
gst_task_pause (stream->stream_task);
gst_task_pause (stream->download_task);
}
gst_task_pause (mssdemux->stream_task);
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
g_static_rec_mutex_lock (&stream->stream_lock);
g_static_rec_mutex_lock (&stream->download_lock);
}
g_static_rec_mutex_lock (&mssdemux->stream_lock);
}
static void
@ -397,13 +442,16 @@ gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
GSList *iter;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
g_static_rec_mutex_unlock (&stream->stream_lock);
g_static_rec_mutex_unlock (&stream->download_lock);
}
g_static_rec_mutex_unlock (&mssdemux->stream_lock);
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_task_start (stream->stream_task);
gst_data_queue_set_flushing (stream->dataqueue, FALSE);
gst_task_start (stream->download_task);
}
gst_task_start (mssdemux->stream_task);
}
static gboolean
@ -458,6 +506,8 @@ gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
stream->eos = FALSE;
gst_data_queue_flush (stream->dataqueue);
stream->pending_newsegment = gst_event_ref (newsegment);
}
gst_event_unref (newsegment);
@ -727,7 +777,7 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
GSList *oldpads = NULL;
GSList *iter;
gst_mss_demux_stop_tasks (mssdemux, FALSE);
gst_mss_demux_stop_tasks (mssdemux, TRUE);
if (gst_mss_manifest_change_bitrate (mssdemux->manifest,
mssdemux->connection_speed)) {
@ -736,15 +786,46 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
GstPad *oldpad = stream->pad;
GstClockTime ts =
gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
GstClockTime ts = GST_CLOCK_TIME_NONE;
oldpads = g_slist_prepend (oldpads, oldpad);
/* since we are flushing the queue, get the next un-pushed timestamp to seek
* and avoid gaps */
gst_data_queue_set_flushing (stream->dataqueue, FALSE);
if (!gst_data_queue_is_empty (stream->dataqueue)) {
GstDataQueueItem *item = NULL;
while (!gst_data_queue_is_empty (stream->dataqueue)
&& !GST_CLOCK_TIME_IS_VALID (ts)) {
gst_data_queue_pop (stream->dataqueue, &item);
if (!item) {
g_assert_not_reached ();
break;
}
if (GST_IS_BUFFER (item->object)) {
GstBuffer *buffer = GST_BUFFER_CAST (item->object);
ts = GST_BUFFER_TIMESTAMP (buffer);
}
item->destroy (item);
}
}
if (!GST_CLOCK_TIME_IS_VALID (ts)) {
ts = gst_mss_stream_get_fragment_gst_timestamp
(stream->manifest_stream);
}
GST_DEBUG_OBJECT (mssdemux,
"Seeking stream %p %s to ts %" GST_TIME_FORMAT, stream,
GST_PAD_NAME (stream->pad), GST_TIME_ARGS (ts));
gst_mss_stream_seek (stream->manifest_stream, ts);
gst_data_queue_flush (stream->dataqueue);
stream->pad = _create_pad (mssdemux, stream->manifest_stream);
/* TODO keep the same playback rate */
stream->pending_newsegment =
gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, ts, -1, ts);
gst_mss_demux_expose_stream (mssdemux, stream);
gst_pad_push_event (oldpad, gst_event_new_eos ());
@ -763,6 +844,37 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
gst_mss_demux_restart_tasks (mssdemux);
}
static void
_free_data_queue_item (gpointer obj)
{
GstDataQueueItem *item = obj;
gst_mini_object_unref (item->object);
g_slice_free (GstDataQueueItem, item);
}
static void
gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
GstMiniObject * obj)
{
GstDataQueueItem *item;
item = g_slice_new (GstDataQueueItem);
item->object = (GstMiniObject *) obj;
item->duration = 0; /* we don't care */
item->size = 0;
item->visible = TRUE;
item->destroy = (GDestroyNotify) _free_data_queue_item;
if (!gst_data_queue_push (stream->dataqueue, item)) {
GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj);
gst_mini_object_unref (obj);
g_slice_free (GstDataQueueItem, item);
}
}
static GstFlowReturn
gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
GstBuffer ** buffer)
@ -811,7 +923,17 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
GST_BUFFER_DURATION (_buffer) =
gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
*buffer = _buffer;
if (buffer)
*buffer = _buffer;
if (_buffer) {
GST_DEBUG_OBJECT (mssdemux,
"Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT,
stream, GST_PAD_NAME (stream->pad),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)));
gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
}
return ret;
no_url_error:
@ -819,39 +941,26 @@ no_url_error:
GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
(_("Failed to get fragment URL.")),
("An error happened when getting fragment URL"));
gst_task_stop (stream->stream_task);
gst_task_stop (stream->download_task);
return GST_FLOW_ERROR;
}
error:
{
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
gst_task_stop (stream->stream_task);
gst_task_stop (stream->download_task);
return GST_FLOW_ERROR;
}
}
static void
gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
gst_mss_demux_download_loop (GstMssDemuxStream * stream)
{
GstMssDemux *mssdemux = stream->parent;
GstBuffer *buffer = NULL;
GstFlowReturn ret;
GST_OBJECT_LOCK (mssdemux);
if (mssdemux->update_bitrates) {
mssdemux->update_bitrates = FALSE;
GST_OBJECT_UNLOCK (mssdemux);
GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
GST_DEBUG_OBJECT (mssdemux,
"Starting streams reconfiguration due to bitrate changes");
g_thread_create ((GThreadFunc) gst_mss_demux_reconfigure, mssdemux, FALSE,
NULL);
GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
gst_task_stop (stream->stream_task);
return;
} else {
GST_OBJECT_UNLOCK (mssdemux);
}
ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
switch (ret) {
@ -867,14 +976,154 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
g_assert (buffer != NULL);
gst_mss_stream_advance_fragment (stream->manifest_stream);
GST_LOG_OBJECT (mssdemux, "download loop end %p", stream);
return;
eos:
{
GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s",
GST_DEBUG_PAD_NAME (stream->pad));
gst_mss_demux_stream_store_object (stream,
GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
gst_task_stop (stream->download_task);
return;
}
error:
{
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
gst_task_stop (stream->download_task);
return;
}
}
static GstFlowReturn
gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
GstMssDemuxStream ** stream)
{
GstFlowReturn ret = GST_FLOW_OK;
GstMssDemuxStream *current = NULL;
GstClockTime cur_time = GST_CLOCK_TIME_NONE;
GSList *iter;
if (!mssdemux->streams)
return GST_FLOW_ERROR;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstClockTime time;
GstMssDemuxStream *other;
GstDataQueueItem *item;
other = iter->data;
if (other->eos) {
continue;
}
if (gst_data_queue_peek (other->dataqueue, &item)) {
} else {
/* flushing */
return GST_FLOW_WRONG_STATE;
}
if (GST_IS_EVENT (item->object)) {
/* events have higher priority */
current = other;
break;
}
time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object));
if (time < cur_time) {
cur_time = time;
current = other;
}
}
*stream = current;
if (current == NULL)
ret = GST_FLOW_UNEXPECTED;
return ret;
}
static void
gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
{
GstMssDemuxStream *stream = NULL;
GstFlowReturn ret;
GstMiniObject *object = NULL;
GstDataQueueItem *item = NULL;
GST_LOG_OBJECT (mssdemux, "Starting stream loop");
GST_OBJECT_LOCK (mssdemux);
if (mssdemux->update_bitrates) {
mssdemux->update_bitrates = FALSE;
GST_OBJECT_UNLOCK (mssdemux);
GST_DEBUG_OBJECT (mssdemux,
"Starting streams reconfiguration due to bitrate changes");
gst_mss_demux_reconfigure (mssdemux);
GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
} else {
GST_OBJECT_UNLOCK (mssdemux);
}
ret = gst_mss_demux_select_latest_stream (mssdemux, &stream);
if (stream)
GST_DEBUG_OBJECT (mssdemux,
"Stream loop selected %p stream of pad %s. %d - %s", stream,
GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
else
GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
gst_flow_get_name (ret));
switch (ret) {
case GST_FLOW_OK:
break;
case GST_FLOW_ERROR:
goto error;
case GST_FLOW_UNEXPECTED:
goto eos;
case GST_FLOW_WRONG_STATE:
GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task");
goto stop;
default:
g_assert_not_reached ();
}
GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
stream, GST_PAD_NAME (stream->pad));
if (gst_data_queue_pop (stream->dataqueue, &item)) {
if (item->object)
object = gst_mini_object_ref (item->object);
item->destroy (item);
} else {
GST_DEBUG_OBJECT (mssdemux,
"Failed to get object from dataqueue on stream %p %s", stream,
GST_PAD_NAME (stream->pad));
goto stop;
}
if (G_UNLIKELY (stream->pending_newsegment)) {
gst_pad_push_event (stream->pad, stream->pending_newsegment);
stream->pending_newsegment = NULL;
}
GST_DEBUG_OBJECT (mssdemux, "Pushing buffer of size %u on pad %s",
GST_BUFFER_SIZE (buffer), GST_PAD_NAME (stream->pad));
ret = gst_pad_push (stream->pad, buffer);
if (G_LIKELY (GST_IS_BUFFER (object))) {
GST_DEBUG_OBJECT (mssdemux,
"Pushing buffer %p %" GST_TIME_FORMAT " on pad %s", object,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
GST_PAD_NAME (stream->pad));
ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
} else if (GST_IS_EVENT (object)) {
if (GST_EVENT_TYPE (object) == GST_EVENT_EOS)
stream->eos = TRUE;
GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object,
GST_PAD_NAME (stream->pad));
gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
} else {
g_return_if_reached ();
}
switch (ret) {
case GST_FLOW_UNEXPECTED:
goto eos; /* EOS ? */
@ -887,22 +1136,25 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
break;
}
gst_mss_stream_advance_fragment (stream->manifest_stream);
GST_LOG_OBJECT (mssdemux, "Stream loop end");
return;
eos:
{
GstEvent *eos = gst_event_new_eos ();
GST_DEBUG_OBJECT (mssdemux, "Pushing EOS on pad %s:%s",
GST_DEBUG_PAD_NAME (stream->pad));
gst_pad_push_event (stream->pad, eos);
gst_task_stop (stream->stream_task);
GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
gst_task_stop (mssdemux->stream_task);
return;
}
error:
{
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
gst_task_stop (stream->stream_task);
gst_task_stop (mssdemux->stream_task);
return;
}
stop:
{
GST_DEBUG_OBJECT (mssdemux, "Stopping streaming task");
gst_task_stop (mssdemux->stream_task);
return;
}
}

View file

@ -25,6 +25,7 @@
#include <gst/gst.h>
#include <gst/base/gstadapter.h>
#include <gst/base/gstdataqueue.h>
#include "gstmssmanifest.h"
#include "gsturidownloader.h"
@ -58,13 +59,15 @@ struct _GstMssDemuxStream {
GstMssStream *manifest_stream;
GstUriDownloader *downloader;
GstDataQueue *dataqueue;
GstEvent *pending_newsegment;
/* Streaming task */
GstTask *stream_task;
GStaticRecMutex stream_lock;
/* Downloading task */
GstTask *download_task;
GStaticRecMutex download_lock;
gboolean eos;
};
struct _GstMssDemux {
@ -84,8 +87,13 @@ struct _GstMssDemux {
gboolean update_bitrates;
/* Streaming task */
GstTask *stream_task;
GStaticRecMutex stream_lock;
/* properties */
guint64 connection_speed; /* in bps */
guint data_queue_max_size;
};
struct _GstMssDemuxClass {