gst/asfdemux/gstasfdemux.*: Activate streams (ie. add the pads to the element) depending on whether we actually get d...

Original commit message from CVS:
* gst/asfdemux/gstasfdemux.c: (gst_asf_demux_reset),
(gst_asf_demux_chain_headers),
(gst_asf_demux_parse_data_object_start), (all_streams_prerolled),
(gst_asf_demux_have_mutually_exclusive_active_stream),
(gst_asf_demux_check_activate_streams),
(gst_asf_demux_find_stream_with_complete_payload),
(gst_asf_demux_push_complete_payloads), (gst_asf_demux_loop),
(gst_asf_demux_activate_ext_props_streams),
(gst_asf_demux_process_object):
* gst/asfdemux/gstasfdemux.h:
Activate streams (ie. add the pads to the element) depending on
whether we actually get data for those streams within the ASF
preroll value specified. Currently only done in pull-mode though
(this will fix problems with playbin hanging on mms streams once
we use this in push-mode as well).
This commit is contained in:
Tim-Philipp Müller 2007-05-07 13:51:43 +00:00
parent f7abd8bbc8
commit 08c63b6800
3 changed files with 252 additions and 67 deletions

View file

@ -1,3 +1,21 @@
2007-05-07 Tim-Philipp Müller <tim at centricular dot net>
* gst/asfdemux/gstasfdemux.c: (gst_asf_demux_reset),
(gst_asf_demux_chain_headers),
(gst_asf_demux_parse_data_object_start), (all_streams_prerolled),
(gst_asf_demux_have_mutually_exclusive_active_stream),
(gst_asf_demux_check_activate_streams),
(gst_asf_demux_find_stream_with_complete_payload),
(gst_asf_demux_push_complete_payloads), (gst_asf_demux_loop),
(gst_asf_demux_activate_ext_props_streams),
(gst_asf_demux_process_object):
* gst/asfdemux/gstasfdemux.h:
Activate streams (ie. add the pads to the element) depending on
whether we actually get data for those streams within the ASF
preroll value specified. Currently only done in pull-mode though
(this will fix problems with playbin hanging on mms streams once
we use this in push-mode as well).
2007-05-04 Tim-Philipp Müller <tim at centricular dot net> 2007-05-04 Tim-Philipp Müller <tim at centricular dot net>
* gst/asfdemux/asfpacket.c: (gst_asf_payload_queue_for_stream): * gst/asfdemux/asfpacket.c: (gst_asf_payload_queue_for_stream):

View file

@ -107,6 +107,8 @@ static gboolean
gst_asf_demux_parse_data_object_start (GstASFDemux * demux, guint8 * data); gst_asf_demux_parse_data_object_start (GstASFDemux * demux, guint8 * data);
static void gst_asf_demux_descramble_buffer (GstASFDemux * demux, static void gst_asf_demux_descramble_buffer (GstASFDemux * demux,
AsfStream * stream, GstBuffer ** p_buffer); AsfStream * stream, GstBuffer ** p_buffer);
static void gst_asf_demux_activate_stream (GstASFDemux * demux,
AsfStream * stream);
GST_BOILERPLATE (GstASFDemux, gst_asf_demux, GstElement, GST_TYPE_ELEMENT); GST_BOILERPLATE (GstASFDemux, gst_asf_demux, GstElement, GST_TYPE_ELEMENT);
@ -205,6 +207,7 @@ gst_asf_demux_reset (GstASFDemux * demux)
demux->num_audio_streams = 0; demux->num_audio_streams = 0;
demux->num_video_streams = 0; demux->num_video_streams = 0;
demux->num_streams = 0; demux->num_streams = 0;
demux->activated_streams = FALSE;
demux->first_ts = GST_CLOCK_TIME_NONE; demux->first_ts = GST_CLOCK_TIME_NONE;
demux->state = GST_ASF_DEMUX_STATE_HEADER; demux->state = GST_ASF_DEMUX_STATE_HEADER;
demux->seekable = FALSE; demux->seekable = FALSE;
@ -648,6 +651,11 @@ gst_asf_demux_chain_headers (GstASFDemux * demux)
if (demux->num_streams == 0) if (demux->num_streams == 0)
goto no_streams; goto no_streams;
/* FIXME: remove when we activate streams after internal preroll in
* streaming mode as well */
GST_LOG_OBJECT (demux, "signalling no more pads");
gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
g_free (data); g_free (data);
return GST_FLOW_OK; return GST_FLOW_OK;
@ -888,7 +896,6 @@ gst_asf_demux_parse_data_object_start (GstASFDemux * demux, guint8 * data)
/* process pending stream objects and create pads for those */ /* process pending stream objects and create pads for those */
gst_asf_demux_process_queued_extended_stream_objects (demux); gst_asf_demux_process_queued_extended_stream_objects (demux);
gst_asf_demux_activate_ext_props_streams (demux); gst_asf_demux_activate_ext_props_streams (demux);
gst_element_no_more_pads (GST_ELEMENT (demux));
GST_INFO_OBJECT (demux, "Stream has %" G_GUINT64_FORMAT " packets, " GST_INFO_OBJECT (demux, "Stream has %" G_GUINT64_FORMAT " packets, "
"data_offset=%" G_GINT64_FORMAT ", data_size=%" G_GINT64_FORMAT "data_offset=%" G_GINT64_FORMAT ", data_size=%" G_GINT64_FORMAT
@ -966,9 +973,127 @@ parse_failed:
} }
} }
static void static gboolean
gst_asf_demux_push_complete_payloads (GstASFDemux * demux) all_streams_prerolled (GstASFDemux * demux)
{ {
GstClockTime preroll_time;
guint i, num_no_data = 0;
preroll_time = demux->preroll * GST_MSECOND;
/* returns TRUE as long as there isn't a stream which (a) has data queued
* and (b) the timestamp of last piece of data queued is < demux->preroll
* AND there is at least one other stream with data queued */
for (i = 0; i < demux->num_streams; ++i) {
AsfPayload *last_payload;
AsfStream *stream;
guint last_idx;
stream = &demux->stream[i];
if (stream->payloads->len == 0) {
++num_no_data;
GST_LOG_OBJECT (stream->pad, "no data queued");
continue;
}
last_idx = stream->payloads->len - 1;
last_payload = &g_array_index (stream->payloads, AsfPayload, last_idx);
GST_LOG_OBJECT (stream->pad, "checking if %" GST_TIME_FORMAT " > %"
GST_TIME_FORMAT, GST_TIME_ARGS (last_payload->ts),
GST_TIME_ARGS (preroll_time));
if (last_payload->ts <= preroll_time) {
GST_LOG_OBJECT (stream->pad, "not beyond preroll point yet");
return FALSE;
}
}
if (num_no_data == demux->num_streams)
return FALSE;
return TRUE;
}
#if 0
static gboolean
gst_asf_demux_have_mutually_exclusive_active_stream (GstASFDemux * demux,
AsfStream * stream)
{
GSList *l;
for (l = demux->mut_ex_streams; l != NULL; l = l->next) {
guint8 *mes;
/* check for each mutual exclusion group whether it affects this stream */
for (mes = (guint8 *) l->data; mes != NULL && *mes != 0xff; ++mes) {
if (*mes == stream->id) {
/* we are in this group; let's check if we've already activated streams
* that are in the same group (and hence mutually exclusive to this
* one) */
for (mes = (guint8 *) l->data; mes != NULL && *mes != 0xff; ++mes) {
guint i;
for (i = 0; i < demux->num_streams; ++i) {
if (demux->stream[i].id == *mes && demux->stream[i].active) {
GST_LOG_OBJECT (demux, "stream with ID %d is mutually exclusive "
"to already active stream with ID %d", stream->id,
demux->stream[i].id);
return TRUE;
}
}
}
/* we can only be in this group once, let's break out and move on to
* the next mutual exclusion group */
break;
}
}
}
return FALSE;
}
#endif
static gboolean
gst_asf_demux_check_activate_streams (GstASFDemux * demux, gboolean force)
{
guint i;
if (demux->activated_streams)
return TRUE;
if (!all_streams_prerolled (demux) && !force) {
GST_DEBUG_OBJECT (demux, "not all streams with data beyond preroll yet");
return FALSE;
}
for (i = 0; i < demux->num_streams; ++i) {
AsfStream *stream = &demux->stream[i];
if (stream->payloads->len > 0) {
/* we don't check mutual exclusion stuff here; either we have data for
* a stream, then we active it, or we don't, then we'll ignore it */
GST_LOG_OBJECT (stream->pad, "is prerolled - activate!");
gst_asf_demux_activate_stream (demux, stream);
} else {
GST_LOG_OBJECT (stream->pad, "no data, ignoring stream");
}
}
demux->activated_streams = TRUE;
GST_LOG_OBJECT (demux, "signalling no more pads");
gst_element_no_more_pads (GST_ELEMENT (demux));
return TRUE;
}
/* returns the stream that has a complete payload with the lowest timestamp
* queued, or NULL (we push things by timestamp because during the internal
* prerolling we might accumulate more data then the external queues can take,
* so we'd lock up if we pushed all accumulated data for stream N in one go) */
static AsfStream *
gst_asf_demux_find_stream_with_complete_payload (GstASFDemux * demux)
{
AsfPayload *best_payload = NULL;
AsfStream *best_stream = NULL;
guint i; guint i;
for (i = 0; i < demux->num_streams; ++i) { for (i = 0; i < demux->num_streams; ++i) {
@ -996,57 +1121,104 @@ gst_asf_demux_push_complete_payloads (GstASFDemux * demux)
} }
} }
while (stream->payloads->len > 0) { /* Now see if there's a complete payload queued for this stream */
if (stream->payloads->len > 0) {
AsfPayload *payload; AsfPayload *payload;
payload = &g_array_index (stream->payloads, AsfPayload, 0); payload = &g_array_index (stream->payloads, AsfPayload, 0);
if (!gst_asf_payload_is_complete (payload)) if (!gst_asf_payload_is_complete (payload))
break; continue;
/* Do we have tags pending for this stream? */ /* ... and whether its timestamp is lower than the current best */
if (stream->pending_tags) { if (best_stream == NULL || best_payload->ts > payload->ts) {
GST_LOG_OBJECT (stream->pad, "%" GST_PTR_FORMAT, stream->pending_tags); best_stream = stream;
gst_element_found_tags_for_pad (GST_ELEMENT (demux), stream->pad, best_payload = payload;
stream->pending_tags);
stream->pending_tags = NULL;
} }
/* We have the whole packet now so we should push the packet to
* the src pad now. First though we should check if we need to do
* descrambling */
if (demux->span > 1) {
gst_asf_demux_descramble_buffer (demux, stream, &payload->buf);
}
payload->buf = gst_buffer_make_metadata_writable (payload->buf);
if (!payload->keyframe) {
GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DELTA_UNIT);
}
if (stream->discont) {
GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DISCONT);
stream->discont = FALSE;
}
gst_buffer_set_caps (payload->buf, stream->caps);
GST_BUFFER_TIMESTAMP (payload->buf) = payload->ts;
GST_BUFFER_DURATION (payload->buf) = payload->duration;
/* FIXME: we should really set durations on buffers if we can */
GST_LOG_OBJECT (stream->pad, "pushing buffer, ts=%" GST_TIME_FORMAT
", dur=%" GST_TIME_FORMAT " size=%u",
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (payload->buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (payload->buf)),
GST_BUFFER_SIZE (payload->buf));
stream->last_flow = gst_pad_push (stream->pad, payload->buf);
payload->buf = NULL;
g_array_remove_index (stream->payloads, 0);
} }
} }
return best_stream;
}
static void
gst_asf_demux_push_complete_payloads (GstASFDemux * demux, gboolean force)
{
AsfStream *stream;
if (G_UNLIKELY (!demux->activated_streams)) {
if (!gst_asf_demux_check_activate_streams (demux, force))
return;
/* streams are now activated */
}
/* do we need to send a newsegment event */
if (demux->need_newsegment) {
if (demux->segment.stop == GST_CLOCK_TIME_NONE &&
demux->segment.duration > 0) {
demux->segment.stop = demux->segment.duration;
}
GST_DEBUG_OBJECT (demux, "sending new-segment event %" GST_SEGMENT_FORMAT,
&demux->segment);
/* note: we fix up all timestamps to start from 0, so this should be ok */
gst_asf_demux_send_event_unlocked (demux,
gst_event_new_new_segment (FALSE, demux->segment.rate,
GST_FORMAT_TIME, demux->segment.start, demux->segment.stop,
demux->segment.start));
demux->need_newsegment = FALSE;
demux->segment_running = TRUE;
}
while ((stream = gst_asf_demux_find_stream_with_complete_payload (demux))) {
AsfPayload *payload;
payload = &g_array_index (stream->payloads, AsfPayload, 0);
/* Do we have tags pending for this stream? */
if (stream->pending_tags) {
GST_LOG_OBJECT (stream->pad, "%" GST_PTR_FORMAT, stream->pending_tags);
gst_element_found_tags_for_pad (GST_ELEMENT (demux), stream->pad,
stream->pending_tags);
stream->pending_tags = NULL;
}
/* We have the whole packet now so we should push the packet to
* the src pad now. First though we should check if we need to do
* descrambling */
if (demux->span > 1) {
gst_asf_demux_descramble_buffer (demux, stream, &payload->buf);
}
payload->buf = gst_buffer_make_metadata_writable (payload->buf);
if (!payload->keyframe) {
GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DELTA_UNIT);
}
if (stream->discont) {
GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DISCONT);
stream->discont = FALSE;
}
gst_buffer_set_caps (payload->buf, stream->caps);
GST_BUFFER_TIMESTAMP (payload->buf) = payload->ts;
GST_BUFFER_DURATION (payload->buf) = payload->duration;
/* FIXME: we should really set durations on buffers if we can */
GST_LOG_OBJECT (stream->pad, "pushing buffer, ts=%" GST_TIME_FORMAT
", dur=%" GST_TIME_FORMAT " size=%u",
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (payload->buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (payload->buf)),
GST_BUFFER_SIZE (payload->buf));
stream->last_flow = gst_pad_push (stream->pad, payload->buf);
payload->buf = NULL;
g_array_remove_index (stream->payloads, 0);
}
} }
static void static void
@ -1083,23 +1255,6 @@ gst_asf_demux_loop (GstASFDemux * demux)
goto read_failed; goto read_failed;
} }
if (demux->need_newsegment) {
if (demux->segment.stop == GST_CLOCK_TIME_NONE)
demux->segment.stop = demux->segment.duration;
GST_DEBUG_OBJECT (demux, "sending new-segment event %" GST_SEGMENT_FORMAT,
&demux->segment);
/* FIXME: check last parameter, streams may have non-zero start */
gst_asf_demux_send_event_unlocked (demux,
gst_event_new_new_segment (FALSE, demux->segment.rate,
GST_FORMAT_TIME, demux->segment.start, demux->segment.stop,
demux->segment.start));
demux->need_newsegment = FALSE;
demux->segment_running = TRUE;
}
/* FIXME: maybe we should just skip broken packets and error out only /* FIXME: maybe we should just skip broken packets and error out only
* after a few broken packets in a row? */ * after a few broken packets in a row? */
if (!gst_asf_demux_parse_packet (demux, buf)) if (!gst_asf_demux_parse_packet (demux, buf))
@ -1107,7 +1262,7 @@ gst_asf_demux_loop (GstASFDemux * demux)
gst_buffer_unref (buf); gst_buffer_unref (buf);
gst_asf_demux_push_complete_payloads (demux); gst_asf_demux_push_complete_payloads (demux, FALSE);
++demux->packet; ++demux->packet;
@ -1128,6 +1283,12 @@ gst_asf_demux_loop (GstASFDemux * demux)
eos: eos:
{ {
/* if we haven't actived our streams yet, this might be because we have
* less data queued than required for preroll; force stream activation and
* send any pending payloads before sending EOS */
if (!demux->activated_streams)
gst_asf_demux_push_complete_payloads (demux, TRUE);
if (demux->segment.flags & GST_SEEK_FLAG_SEGMENT) { if (demux->segment.flags & GST_SEEK_FLAG_SEGMENT) {
gint64 stop; gint64 stop;
@ -2646,8 +2807,9 @@ gst_asf_demux_activate_ext_props_streams (GstASFDemux * demux)
next: next:
/* FIXME: we should do stream activation based on preroll data */ /* FIXME: we should do stream activation based on preroll data in
if (!is_hidden) * streaming mode too */
if (demux->streaming && !is_hidden)
gst_asf_demux_activate_stream (demux, stream); gst_asf_demux_activate_stream (demux, stream);
} }
} }
@ -2681,8 +2843,12 @@ gst_asf_demux_process_object (GstASFDemux * demux, guint8 ** p_data,
stream = stream =
gst_asf_demux_parse_stream_object (demux, *p_data, obj_data_size); gst_asf_demux_parse_stream_object (demux, *p_data, obj_data_size);
if (stream)
/* FIXME: we should do stream activation based on preroll data in
* streaming mode too */
if (demux->streaming && stream != NULL)
gst_asf_demux_activate_stream (demux, stream); gst_asf_demux_activate_stream (demux, stream);
ret = GST_FLOW_OK; ret = GST_FLOW_OK;
break; break;
} }

View file

@ -149,6 +149,7 @@ struct _GstASFDemux {
guint32 num_video_streams; guint32 num_video_streams;
guint32 num_streams; guint32 num_streams;
AsfStream stream[GST_ASF_DEMUX_NUM_STREAMS]; AsfStream stream[GST_ASF_DEMUX_NUM_STREAMS];
gboolean activated_streams;
GstClockTime first_ts; /* first timestamp found */ GstClockTime first_ts; /* first timestamp found */