dashdemux: correctly signal EOS on manifest end

Put EOS on the streams queues after the last fragment from the
last period for each stream. This way we keep it serialized
with the buffers and it will work when streams have different
ending times
This commit is contained in:
Thiago Santos 2013-01-29 15:58:50 -03:00
parent a19c689dfa
commit 2d85107299
4 changed files with 136 additions and 30 deletions

View file

@ -185,6 +185,14 @@ enum
#define DEFAULT_FAILED_COUNT 3
/* Custom internal event to signal end of period */
#define GST_EVENT_DASH_EOP GST_EVENT_MAKE_TYPE(81, GST_EVENT_TYPE_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED)
static GstEvent *
gst_event_new_dash_eop (void)
{
return gst_event_new_custom (GST_EVENT_DASH_EOP, NULL);
}
/* GObject */
static void gst_dash_demux_set_property (GObject * object, guint prop_id,
@ -496,11 +504,22 @@ _data_queue_item_destroy (GstDataQueueItem * item)
g_free (item);
}
static void
gst_dash_demux_stream_push_event (GstDashDemuxStream * stream, GstEvent * event)
{
GstDataQueueItem *item = g_new0 (GstDataQueueItem, 1);
item->object = GST_MINI_OBJECT_CAST (event);
item->destroy = (GDestroyNotify) _data_queue_item_destroy;
gst_data_queue_push (stream->queue, item);
}
static void
gst_dash_demux_stream_push_data (GstDashDemuxStream * stream,
GstBuffer * fragment)
{
GstDataQueueItem *item = g_new0 (GstDataQueueItem, 1);
GstDataQueueItem *item = g_new (GstDataQueueItem, 1);
item->object = GST_MINI_OBJECT_CAST (fragment);
item->duration = GST_BUFFER_DURATION (fragment);
@ -1011,9 +1030,15 @@ needs_pad_switch (GstDashDemux * demux)
GstCaps *srccaps = NULL;
GstBuffer *buffer;
if (stream->stream_end_of_period || stream->stream_eos)
continue;
if (!gst_data_queue_peek (stream->queue, &item))
continue;
if (!GST_IS_BUFFER (item->object))
continue;
buffer = GST_BUFFER_CAST (item->object);
gst_caps_replace (&stream->output_caps, GST_BUFFER_CAPS (buffer));
@ -1063,6 +1088,8 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
GSList *iter;
GstClockTime best_time;
GstDashDemuxStream *selected_stream;
gboolean eos = TRUE;
gboolean eop = TRUE;
GST_LOG_OBJECT (demux, "Starting stream loop");
@ -1102,10 +1129,20 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
GstDashDemuxStream *stream = iter->data;
GstDataQueueItem *item;
if (stream->stream_eos)
continue;
if (stream->stream_end_of_period) {
eos = FALSE;
continue;
}
eos = FALSE;
eop = FALSE;
if (!gst_data_queue_peek (stream->queue, &item))
continue;
if (GST_IS_BUFFER (item->object)) {
if (G_LIKELY (GST_IS_BUFFER (item->object))) {
if (GST_BUFFER_TIMESTAMP (item->object) < best_time) {
best_time = GST_BUFFER_TIMESTAMP (item->object);
selected_stream = stream;
@ -1125,36 +1162,59 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
if (!gst_data_queue_pop (selected_stream->queue, &item))
goto end;
buffer = GST_BUFFER_CAST (item->object);
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client,
selected_stream->index);
if (demux->need_segment) {
GstClockTime start =
GST_BUFFER_TIMESTAMP (buffer) + demux->position_shift;
/* And send a newsegment */
GST_DEBUG_OBJECT (demux, "Sending new-segment. segment start:%"
GST_TIME_FORMAT, GST_TIME_ARGS (start));
for (iter = demux->streams, i = 0; iter; i++, iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
gst_pad_push_event (stream->pad,
gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME,
start, GST_CLOCK_TIME_NONE, start));
if (G_LIKELY (GST_IS_BUFFER (item->object))) {
buffer = GST_BUFFER_CAST (item->object);
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client,
selected_stream->index);
if (demux->need_segment) {
GstClockTime start =
GST_BUFFER_TIMESTAMP (buffer) + demux->position_shift;
/* And send a newsegment */
GST_DEBUG_OBJECT (demux, "Sending new-segment. segment start:%"
GST_TIME_FORMAT, GST_TIME_ARGS (start));
for (iter = demux->streams, i = 0; iter;
i++, iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
gst_pad_push_event (stream->pad,
gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME,
start, GST_CLOCK_TIME_NONE, start));
}
demux->need_segment = FALSE;
demux->position_shift = 0;
}
demux->need_segment = FALSE;
demux->position_shift = 0;
}
GST_DEBUG_OBJECT (demux,
"Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%"
GST_TIME_FORMAT, buffer, GST_BUFFER_OFFSET (buffer),
selected_stream->index, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer));
item->destroy (item);
if ((ret != GST_FLOW_OK) && (active_stream
&& active_stream->mimeType == GST_STREAM_VIDEO))
goto error_pushing;
GST_DEBUG_OBJECT (demux,
"Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%"
GST_TIME_FORMAT, buffer, GST_BUFFER_OFFSET (buffer),
selected_stream->index, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer));
item->destroy (item);
if ((ret != GST_FLOW_OK) && (active_stream
&& active_stream->mimeType == GST_STREAM_VIDEO))
goto error_pushing;
} else {
/* a GstEvent */
if (GST_EVENT_TYPE (item->object) == GST_EVENT_EOS) {
selected_stream->stream_end_of_period = TRUE;
selected_stream->stream_eos = TRUE;
} else if (GST_EVENT_TYPE (item->object) == GST_EVENT_DASH_EOP) {
selected_stream->stream_end_of_period = TRUE;
}
if (GST_EVENT_TYPE (item->object) != GST_EVENT_DASH_EOP) {
gst_pad_push_event (selected_stream->pad,
gst_event_ref (GST_EVENT_CAST (item->object)));
}
item->destroy (item);
}
} else {
if (eos) {
goto end_of_manifest;
} else if (eop) {
/* TODO advance to next period */
}
}
end:
@ -1750,11 +1810,23 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
guint stream_idx = stream->index;
GstBuffer *buffer;
if (stream->download_end_of_period)
continue;
if (!gst_mpd_client_get_next_fragment (demux->client,
stream_idx, &discont, &next_fragment_uri, &duration, &timestamp)) {
GstEvent *event = NULL;
GST_INFO_OBJECT (demux,
"This Period doesn't contain more fragments for stream %u",
stream_idx);
if (gst_mpd_client_has_next_period (demux->client)) {
event = gst_event_new_dash_eop ();
} else {
event = gst_event_new_eos ();
}
stream->download_end_of_period = TRUE;
gst_dash_demux_stream_push_event (stream, event);
continue;
}

View file

@ -62,6 +62,24 @@ struct _GstDashDemuxStream
GstCaps *output_caps;
GstCaps *input_caps;
/*
* Need to store the status for the download and
* stream tasks separately as they are working at
* different points of the stream timeline.
* The download task is ahead of the stream.
*
* The download_end_of_period is set when a stream
* has already downloaded all fragments for the current
* period.
*
* The stream_end_of_period is set when a stream
* has pushed all fragments for the current period
*/
gboolean download_end_of_period;
gboolean stream_end_of_period;
gboolean stream_eos;
GstDataQueue *queue;
};

View file

@ -3316,6 +3316,21 @@ gst_mpd_client_get_period_index (GstMpdClient * client)
return period_idx;
}
gboolean
gst_mpd_client_has_next_period (GstMpdClient * client)
{
GList *next_stream_period;
g_return_val_if_fail (client != NULL, FALSE);
g_return_val_if_fail (client->periods != NULL, FALSE);
GST_MPD_CLIENT_LOCK (client);
next_stream_period =
g_list_nth_data (client->periods, client->period_idx + 1);
GST_MPD_CLIENT_UNLOCK (client);
return next_stream_period != NULL;
}
void
gst_mpd_client_set_segment_index_for_all_streams (GstMpdClient * client,
guint segment_idx)

View file

@ -477,6 +477,7 @@ gboolean gst_mpd_client_is_live (GstMpdClient * client);
/* Period selection */
gboolean gst_mpd_client_set_period_index (GstMpdClient *client, guint period_idx);
guint gst_mpd_client_get_period_index (GstMpdClient *client);
gboolean gst_mpd_client_has_next_period (GstMpdClient *client);
/* Representation selection */
gint gst_mpdparser_get_rep_idx_with_max_bandwidth (GList *Representations, gint max_bandwidth);