mssdemux: remove the stream loop task

Download and push from the same task, makes code a lot simpler
to maintain. Also pushing from separate threads avoids deadlocking
when gst_pad_push blocks due to downstream queues being full
This commit is contained in:
Thiago Santos 2013-12-16 16:14:24 -03:00
parent 4e6e1315da
commit 67f3190301
2 changed files with 102 additions and 400 deletions

View file

@ -133,9 +133,11 @@ static gboolean gst_mss_demux_src_query (GstPad * pad, GstObject * parent,
GstQuery * query);
static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux);
static void gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
GstMiniObject * obj);
static GstFlowReturn gst_mss_demux_stream_push (GstMssDemuxStream * stream,
GstBuffer * buffer);
static GstFlowReturn gst_mss_demux_stream_push_event (GstMssDemuxStream *
stream, GstEvent * event);
static GstFlowReturn gst_mss_demux_combine_flows (GstMssDemux * mssdemux);
static gboolean gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
@ -199,12 +201,6 @@ gst_mss_demux_init (GstMssDemux * mssdemux)
GST_DEBUG_FUNCPTR (gst_mss_demux_event));
gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
g_rec_mutex_init (&mssdemux->stream_lock);
mssdemux->stream_task =
gst_task_new ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux,
NULL);
gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock);
mssdemux->data_queue_max_size = DEFAULT_MAX_QUEUE_SIZE_BUFFERS;
mssdemux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
@ -212,18 +208,6 @@ gst_mss_demux_init (GstMssDemux * mssdemux)
mssdemux->group_id = G_MAXUINT;
}
static gboolean
_data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes,
guint64 time, gpointer checkdata)
{
GstMssDemuxStream *stream = checkdata;
GstMssDemux *mssdemux = stream->parent;
if (mssdemux->data_queue_max_size == 0)
return FALSE; /* never full */
return visible >= mssdemux->data_queue_max_size;
}
static GstMssDemuxStream *
gst_mss_demux_stream_new (GstMssDemux * mssdemux,
GstMssStream * manifeststream, GstPad * srcpad)
@ -232,8 +216,6 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
stream = g_new0 (GstMssDemuxStream, 1);
stream->downloader = gst_uri_downloader_new ();
stream->dataqueue =
gst_data_queue_new (_data_queue_check_full, NULL, NULL, stream);
/* Downloading task */
g_rec_mutex_init (&stream->download_lock);
@ -261,8 +243,6 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
GST_DEBUG_PAD_NAME (stream->pad));
gst_uri_downloader_cancel (stream->downloader);
gst_task_stop (stream->download_task);
g_rec_mutex_lock (&stream->download_lock);
g_rec_mutex_unlock (&stream->download_lock);
GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
gst_task_join (stream->download_task);
GST_LOG_OBJECT (stream->parent, "Finished");
@ -282,10 +262,6 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
g_object_unref (stream->downloader);
stream->downloader = NULL;
}
if (stream->dataqueue) {
g_object_unref (stream->dataqueue);
stream->dataqueue = NULL;
}
if (stream->pad) {
gst_object_unref (stream->pad);
stream->pad = NULL;
@ -302,17 +278,10 @@ gst_mss_demux_reset (GstMssDemux * mssdemux)
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
if (stream->downloader)
gst_uri_downloader_cancel (stream->downloader);
gst_data_queue_set_flushing (stream->dataqueue, TRUE);
}
if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) {
gst_task_stop (mssdemux->stream_task);
g_rec_mutex_lock (&mssdemux->stream_lock);
g_rec_mutex_unlock (&mssdemux->stream_lock);
gst_task_join (mssdemux->stream_task);
}
if (mssdemux->manifest_buffer) {
@ -351,12 +320,6 @@ gst_mss_demux_dispose (GObject * object)
gst_mss_demux_reset (mssdemux);
if (mssdemux->stream_task) {
gst_object_unref (mssdemux->stream_task);
g_rec_mutex_clear (&mssdemux->stream_lock);
mssdemux->stream_task = NULL;
}
G_OBJECT_CLASS (parent_class)->dispose (object);
}
@ -465,8 +428,6 @@ gst_mss_demux_start (GstMssDemux * mssdemux)
GstMssDemuxStream *stream = iter->data;
gst_task_start (stream->download_task);
}
gst_task_start (mssdemux->stream_task);
}
static gboolean
@ -527,14 +488,11 @@ gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_data_queue_set_flushing (stream->dataqueue, TRUE);
stream->cancelled = TRUE;
if (immediate)
gst_uri_downloader_cancel (stream->downloader);
gst_task_pause (stream->download_task);
}
gst_task_pause (mssdemux->stream_task);
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
@ -542,7 +500,6 @@ gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
stream->cancelled = FALSE;
stream->download_error_count = 0;
}
g_rec_mutex_lock (&mssdemux->stream_lock);
}
static void
@ -554,14 +511,11 @@ gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
gst_uri_downloader_reset (stream->downloader);
g_rec_mutex_unlock (&stream->download_lock);
}
g_rec_mutex_unlock (&mssdemux->stream_lock);
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_data_queue_set_flushing (stream->dataqueue, FALSE);
gst_task_start (stream->download_task);
}
gst_task_start (mssdemux->stream_task);
}
static gboolean
@ -622,7 +576,6 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
if (flags & GST_SEEK_FLAG_FLUSH) {
stream->last_ret = GST_FLOW_OK;
}
gst_data_queue_flush (stream->dataqueue);
gst_event_replace (&stream->pending_newsegment, newsegment);
}
gst_event_unref (newsegment);
@ -648,8 +601,8 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
if (stream->pad == pad) {
GST_OBJECT_LOCK (mssdemux);
if (GST_TASK_STATE (stream->download_task) == GST_TASK_PAUSED
&& stream->last_ret == GST_FLOW_NOT_LINKED) {
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (stream->pad, "Received reconfigure");
stream->restart_download = TRUE;
gst_task_start (stream->download_task);
@ -998,9 +951,10 @@ gst_mss_demux_reload_manifest (GstMssDemux * mssdemux)
g_object_unref (downloader);
}
static void
static GstEvent *
gst_mss_demux_reconfigure_stream (GstMssDemuxStream * stream)
{
GstEvent *capsevent = NULL;
GstMssDemux *mssdemux = stream->parent;
guint64 new_bitrate;
@ -1015,7 +969,6 @@ gst_mss_demux_reconfigure_stream (GstMssDemuxStream * stream)
"Current stream download bitrate %" G_GUINT64_FORMAT, new_bitrate);
if (gst_mss_stream_select_bitrate (stream->manifest_stream, new_bitrate)) {
GstEvent *capsevent;
GstCaps *caps;
caps = gst_mss_stream_get_caps (stream->manifest_stream);
@ -1032,47 +985,9 @@ gst_mss_demux_reconfigure_stream (GstMssDemuxStream * stream)
gst_mss_stream_get_current_bitrate (stream->manifest_stream), caps);
capsevent = gst_event_new_caps (stream->caps);
gst_mss_demux_stream_store_object (stream,
GST_MINI_OBJECT_CAST (capsevent));
GST_DEBUG_OBJECT (stream->pad, "Finished streams reconfiguration");
}
}
static void
_free_data_queue_item (gpointer obj)
{
GstDataQueueItem *item = obj;
gst_mini_object_unref (item->object);
g_slice_free (GstDataQueueItem, item);
}
static void
gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
GstMiniObject * obj)
{
GstDataQueueItem *item;
gboolean ret = FALSE;
item = g_slice_new (GstDataQueueItem);
item->object = (GstMiniObject *) obj;
item->duration = 0; /* we don't care */
item->size = 0;
item->visible = TRUE;
item->destroy = (GDestroyNotify) _free_data_queue_item;
if (G_LIKELY (GST_IS_BUFFER (obj))) {
ret = gst_data_queue_push (stream->dataqueue, item);
} else {
ret = gst_data_queue_push_force (stream->dataqueue, item);
}
if (!ret) {
GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj);
item->destroy (item);
}
return capsevent;
}
static GstFlowReturn
@ -1089,14 +1004,14 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
/* special case for not-linked streams */
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (mssdemux, "Skipping download for not-linked stream %p",
GST_DEBUG_OBJECT (stream->pad, "Skipping download for not-linked stream %p",
stream);
return GST_FLOW_NOT_LINKED;
}
before_download = g_get_real_time ();
GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
GST_DEBUG_OBJECT (stream->pad, "Getting url for stream");
ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
switch (ret) {
case GST_FLOW_OK:
@ -1186,6 +1101,7 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
GstBuffer *buffer = NULL;
gboolean buffer_downloaded = FALSE;
GstEvent *gap = NULL;
GstEvent *capsevent = NULL;
GST_LOG_OBJECT (stream->pad, "download loop start");
@ -1214,9 +1130,7 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
ts = MAX (ts, stream->next_timestamp);
GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
"position %" GST_TIME_FORMAT ", catching up until segment position %"
GST_TIME_FORMAT,
GST_TIME_ARGS (ts), GST_TIME_ARGS (mssdemux->segment.position));
"position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
if (GST_CLOCK_TIME_IS_VALID (ts)) {
gst_mss_stream_seek (stream->manifest_stream, ts);
@ -1226,33 +1140,16 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
}
}
/* This stream might be entering into catching up mode,
* meaning that it will push buffers from this same download thread
* until it reaches the segment position.
*
* The reason for this is that in case of stream switching, the other
* stream that was previously active might be blocking the stream_loop
* in case it is ahead enough that all queues are filled.
* In this case, it is possible that a downstream input-selector is
* blocking waiting for the currently active stream to reach the
* same position of the old linked stream because of the 'sync-streams'
* behavior.
*
* We can push from this thread up to segment position as all other
* streams should be around the same timestamp.
*/
stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
stream->eos = FALSE;
gst_data_queue_set_flushing (stream->dataqueue, FALSE);
stream->restart_download = FALSE;
stream->last_ret = GST_FLOW_OK;
}
gst_mss_demux_reconfigure_stream (stream);
capsevent = gst_mss_demux_reconfigure_stream (stream);
GST_OBJECT_UNLOCK (mssdemux);
if (gap != NULL)
if (G_UNLIKELY (gap != NULL))
gst_pad_push_event (stream->pad, gap);
if (G_UNLIKELY (capsevent != NULL))
gst_pad_push_event (stream->pad, capsevent);
ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
buffer_downloaded = buffer != NULL;
@ -1264,174 +1161,75 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
}
if (buffer) {
gboolean catch_up = FALSE;
/* Check if this stream is on catch up mode */
if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) {
GST_DEBUG_OBJECT (stream->pad,
"Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT,
GST_TIME_ARGS (mssdemux->segment.position),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
if (GST_BUFFER_TIMESTAMP (buffer) < mssdemux->segment.position) {
catch_up = TRUE;
} else {
GST_OBJECT_LOCK (mssdemux);
stream->last_ret = GST_FLOW_OK;
gst_task_start (mssdemux->stream_task);
GST_OBJECT_UNLOCK (mssdemux);
}
}
GST_DEBUG_OBJECT (stream->pad,
"%s buffer for stream. Timestamp: %" GST_TIME_FORMAT
" Duration: %" GST_TIME_FORMAT,
catch_up ? "Catch up push for" : "Storing",
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
if (catch_up) {
ret = gst_pad_push (stream->pad, buffer);
if (G_LIKELY (ret == GST_FLOW_OK))
stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
else {
stream->last_ret = ret;
}
} else {
gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (buffer));
}
ret = gst_mss_demux_stream_push (stream, buffer);
}
GST_OBJECT_LOCK (mssdemux);
stream->last_ret = ret;
switch (ret) {
case GST_FLOW_OK:
break; /* all is good, let's go */
case GST_FLOW_EOS:
goto eos;
case GST_FLOW_ERROR:
goto error;
GST_DEBUG_OBJECT (stream->pad, "EOS, stopping download loop");
gst_mss_demux_stream_push_event (stream, gst_event_new_eos ());
gst_task_pause (stream->download_task);
break;
case GST_FLOW_NOT_LINKED:
goto notlinked;
case GST_FLOW_FLUSHING:
goto flushing;
gst_task_pause (stream->download_task);
if (gst_mss_demux_combine_flows (mssdemux) == GST_FLOW_NOT_LINKED) {
GST_ELEMENT_ERROR (mssdemux, STREAM, FAILED,
(_("Internal data stream error.")),
("stream stopped, reason %s",
gst_flow_get_name (GST_FLOW_NOT_LINKED)));
}
break;
case GST_FLOW_FLUSHING:{
GSList *iter;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *other;
other = iter->data;
gst_task_pause (other->download_task);
}
}
break;
default:
if (ret < GST_FLOW_ERROR)
goto error;
if (ret <= GST_FLOW_ERROR) {
if (buffer_downloaded) {
GST_ERROR_OBJECT (mssdemux, "Error while pushing fragment");
} else {
GST_WARNING_OBJECT (mssdemux, "Error while downloading fragment");
if (++stream->download_error_count >=
DOWNLOAD_RATE_MAX_HISTORY_LENGTH) {
GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
(_("Couldn't download fragments")),
("fragment downloading has failed too much consecutive times"));
}
}
}
break;
}
stream->download_error_count = 0;
GST_OBJECT_UNLOCK (mssdemux);
if (buffer_downloaded) {
stream->download_error_count = 0;
gst_mss_stream_advance_fragment (stream->manifest_stream);
}
GST_LOG_OBJECT (stream->pad, "download loop end");
return;
eos:
{
GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s",
GST_DEBUG_PAD_NAME (stream->pad));
gst_mss_demux_stream_store_object (stream,
GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
GST_OBJECT_LOCK (mssdemux);
gst_task_pause (stream->download_task);
gst_task_start (mssdemux->stream_task);
GST_OBJECT_UNLOCK (mssdemux);
return;
}
error:
{
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
if (++stream->download_error_count >= DOWNLOAD_RATE_MAX_HISTORY_LENGTH) {
GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
(_("Couldn't download fragments")),
("fragment downloading has failed too much consecutive times"));
}
return;
}
cancelled:
{
GST_DEBUG_OBJECT (mssdemux, "Stream %p has been cancelled", stream);
gst_task_pause (stream->download_task);
return;
}
notlinked:
{
GST_OBJECT_LOCK (mssdemux);
if (stream->last_ret == GST_FLOW_NOT_LINKED) {
gst_task_pause (stream->download_task);
gst_data_queue_set_flushing (stream->dataqueue, TRUE);
}
GST_OBJECT_UNLOCK (mssdemux);
return;
}
flushing:
{
GSList *iter;
GST_OBJECT_LOCK (mssdemux);
gst_task_pause (mssdemux->stream_task);
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *other;
other = iter->data;
gst_task_pause (other->download_task);
gst_data_queue_set_flushing (other->dataqueue, TRUE);
}
GST_OBJECT_UNLOCK (mssdemux);
return;
}
}
static GstFlowReturn
gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
GstMssDemuxStream ** stream)
{
GstFlowReturn ret = GST_FLOW_OK;
GstMssDemuxStream *current = NULL;
GstClockTime cur_time = GST_CLOCK_TIME_NONE;
GSList *iter;
if (!mssdemux->streams)
return GST_FLOW_ERROR;
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstClockTime time;
GstMssDemuxStream *other;
GstDataQueueItem *item;
other = iter->data;
if (other->eos || other->last_ret != GST_FLOW_OK) {
GST_DEBUG_OBJECT (mssdemux, "Skipping stream %p eos:%d last-ret:%d",
other, other->eos, other->last_ret);
continue;
}
if (!gst_data_queue_peek (other->dataqueue, &item)) {
/* flushing */
if (other->last_ret == GST_FLOW_NOT_LINKED) {
/* might have been unlinked and won't receive data for now */
continue;
}
return GST_FLOW_FLUSHING;
}
if (GST_IS_EVENT (item->object)) {
/* events have higher priority */
current = other;
break;
}
time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object));
if (time < cur_time) {
cur_time = time;
current = other;
}
}
*stream = current;
if (current == NULL)
ret = GST_FLOW_EOS;
return ret;
}
static GstFlowReturn
@ -1455,144 +1253,53 @@ gst_mss_demux_combine_flows (GstMssDemux * mssdemux)
return GST_FLOW_OK;
}
static void
gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
static gboolean
gst_mss_demux_stream_push (GstMssDemuxStream * stream, GstBuffer * buf)
{
GstMssDemuxStream *stream = NULL;
GstFlowReturn ret;
GstMiniObject *object = NULL;
GstDataQueueItem *item = NULL;
GST_LOG_OBJECT (mssdemux, "Starting stream loop");
ret = gst_mss_demux_select_latest_stream (mssdemux, &stream);
if (stream)
GST_DEBUG_OBJECT (mssdemux,
"Stream loop selected %p stream of pad %s. %d - %s", stream,
GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
else
GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
gst_flow_get_name (ret));
/* Lock as this may change the tasks state */
GST_OBJECT_LOCK (mssdemux);
switch (ret) {
case GST_FLOW_OK:
break;
case GST_FLOW_ERROR:
goto error;
case GST_FLOW_EOS:
goto eos;
case GST_FLOW_FLUSHING:
GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task");
goto stop;
default:
g_assert_not_reached ();
}
GST_OBJECT_UNLOCK (mssdemux);
GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
stream, GST_PAD_NAME (stream->pad));
if (gst_data_queue_pop (stream->dataqueue, &item)) {
if (item->object)
object = gst_mini_object_ref (item->object);
item->destroy (item);
} else {
GST_DEBUG_OBJECT (mssdemux,
"Failed to get object from dataqueue on stream %p %s", stream,
GST_PAD_NAME (stream->pad));
goto stop;
}
if (G_UNLIKELY (stream->pending_newsegment)) {
gst_pad_push_event (stream->pad, stream->pending_newsegment);
stream->pending_newsegment = NULL;
}
if (G_LIKELY (GST_IS_BUFFER (object))) {
if (GST_BUFFER_TIMESTAMP (object) != stream->next_timestamp) {
GST_DEBUG_OBJECT (mssdemux, "Marking buffer %p as discont buffer:%"
GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, object,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
GST_TIME_ARGS (stream->next_timestamp));
GST_BUFFER_FLAG_SET (object, GST_BUFFER_FLAG_DISCONT);
}
GST_DEBUG_OBJECT (mssdemux,
"Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
" discont:%d on pad %s:%s", object,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
GST_DEBUG_PAD_NAME (stream->pad));
stream->next_timestamp =
GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
stream->have_data = TRUE;
mssdemux->segment.position = GST_BUFFER_TIMESTAMP (object);
ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
GST_DEBUG_OBJECT (mssdemux, "Pushed on pad %s:%s result: %d (%s)",
GST_DEBUG_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
} else if (GST_IS_EVENT (object)) {
if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) {
stream->eos = TRUE;
}
GST_DEBUG_OBJECT (mssdemux, "Pushing event %" GST_PTR_FORMAT " on pad %s",
object, GST_PAD_NAME (stream->pad));
gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
ret = GST_FLOW_EOS;
} else {
g_return_if_reached ();
if (GST_BUFFER_TIMESTAMP (buf) != stream->next_timestamp) {
GST_DEBUG_OBJECT (stream->pad, "Marking buffer %p as discont buffer:%"
GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, buf,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (stream->next_timestamp));
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
}
/* Lock as this may change the tasks state */
GST_OBJECT_LOCK (mssdemux);
stream->last_ret = ret;
ret = gst_mss_demux_combine_flows (mssdemux);
switch (ret) {
case GST_FLOW_EOS:
goto eos;
case GST_FLOW_ERROR:
goto error;
case GST_FLOW_FLUSHING:
goto stop;
case GST_FLOW_NOT_LINKED:
/* stream won't download any more data until it gets a reconfigure */
break;
case GST_FLOW_OK:
break;
default:
if (ret < GST_FLOW_ERROR)
goto error;
break;
}
GST_OBJECT_UNLOCK (mssdemux);
GST_DEBUG_OBJECT (stream->pad,
"Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
" discont:%d", buf,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DISCONT));
GST_LOG_OBJECT (mssdemux, "Stream loop end");
return;
stream->next_timestamp =
GST_BUFFER_TIMESTAMP (buf) + GST_BUFFER_DURATION (buf);
eos:
{
GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
gst_task_pause (mssdemux->stream_task);
GST_OBJECT_UNLOCK (mssdemux);
return;
}
error:
{
GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
gst_task_pause (mssdemux->stream_task);
GST_OBJECT_UNLOCK (mssdemux);
return;
}
stop:
{
GST_DEBUG_OBJECT (mssdemux, "Pausing streaming task");
gst_task_pause (mssdemux->stream_task);
GST_OBJECT_UNLOCK (mssdemux);
return;
}
stream->have_data = TRUE;
stream->parent->segment.position = GST_BUFFER_TIMESTAMP (buf);
ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (buf));
GST_DEBUG_OBJECT (stream->pad, "Pushed. result: %d (%s)",
ret, gst_flow_get_name (ret));
return ret;
}
static gboolean
gst_mss_demux_stream_push_event (GstMssDemuxStream * stream, GstEvent * event)
{
gboolean ret;
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
stream->eos = TRUE;
}
GST_DEBUG_OBJECT (stream->pad, "Pushing event %" GST_PTR_FORMAT, event);
ret = gst_pad_push_event (stream->pad, event);
return ret;
}

View file

@ -62,7 +62,6 @@ struct _GstMssDemuxStream {
GstMssStream *manifest_stream;
GstUriDownloader *downloader;
GstDataQueue *dataqueue;
GstEvent *pending_newsegment;
@ -106,10 +105,6 @@ struct _GstMssDemux {
gboolean update_bitrates;
/* Streaming task */
GstTask *stream_task;
GRecMutex stream_lock;
/* properties */
guint64 connection_speed; /* in bps */
guint data_queue_max_size;