From 08c63b68002a55739180edd1d8d0754c055429e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim-Philipp=20M=C3=BCller?= Date: Mon, 7 May 2007 13:51:43 +0000 Subject: [PATCH] gst/asfdemux/gstasfdemux.*: Activate streams (ie. add the pads to the element) depending on whether we actually get d... Original commit message from CVS: * gst/asfdemux/gstasfdemux.c: (gst_asf_demux_reset), (gst_asf_demux_chain_headers), (gst_asf_demux_parse_data_object_start), (all_streams_prerolled), (gst_asf_demux_have_mutually_exclusive_active_stream), (gst_asf_demux_check_activate_streams), (gst_asf_demux_find_stream_with_complete_payload), (gst_asf_demux_push_complete_payloads), (gst_asf_demux_loop), (gst_asf_demux_activate_ext_props_streams), (gst_asf_demux_process_object): * gst/asfdemux/gstasfdemux.h: Activate streams (ie. add the pads to the element) depending on whether we actually get data for those streams within the ASF preroll value specified. Currently only done in pull-mode though (this will fix problems with playbin hanging on mms streams once we use this in push-mode as well). --- ChangeLog | 18 +++ gst/asfdemux/gstasfdemux.c | 300 ++++++++++++++++++++++++++++--------- gst/asfdemux/gstasfdemux.h | 1 + 3 files changed, 252 insertions(+), 67 deletions(-) diff --git a/ChangeLog b/ChangeLog index 3a3ffe274d..090b56e0cd 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,21 @@ +2007-05-07 Tim-Philipp Müller + + * gst/asfdemux/gstasfdemux.c: (gst_asf_demux_reset), + (gst_asf_demux_chain_headers), + (gst_asf_demux_parse_data_object_start), (all_streams_prerolled), + (gst_asf_demux_have_mutually_exclusive_active_stream), + (gst_asf_demux_check_activate_streams), + (gst_asf_demux_find_stream_with_complete_payload), + (gst_asf_demux_push_complete_payloads), (gst_asf_demux_loop), + (gst_asf_demux_activate_ext_props_streams), + (gst_asf_demux_process_object): + * gst/asfdemux/gstasfdemux.h: + Activate streams (ie. add the pads to the element) depending on + whether we actually get data for those streams within the ASF + preroll value specified. Currently only done in pull-mode though + (this will fix problems with playbin hanging on mms streams once + we use this in push-mode as well). + 2007-05-04 Tim-Philipp Müller * gst/asfdemux/asfpacket.c: (gst_asf_payload_queue_for_stream): diff --git a/gst/asfdemux/gstasfdemux.c b/gst/asfdemux/gstasfdemux.c index f184ea0d62..daa5198f21 100644 --- a/gst/asfdemux/gstasfdemux.c +++ b/gst/asfdemux/gstasfdemux.c @@ -107,6 +107,8 @@ static gboolean gst_asf_demux_parse_data_object_start (GstASFDemux * demux, guint8 * data); static void gst_asf_demux_descramble_buffer (GstASFDemux * demux, AsfStream * stream, GstBuffer ** p_buffer); +static void gst_asf_demux_activate_stream (GstASFDemux * demux, + AsfStream * stream); GST_BOILERPLATE (GstASFDemux, gst_asf_demux, GstElement, GST_TYPE_ELEMENT); @@ -205,6 +207,7 @@ gst_asf_demux_reset (GstASFDemux * demux) demux->num_audio_streams = 0; demux->num_video_streams = 0; demux->num_streams = 0; + demux->activated_streams = FALSE; demux->first_ts = GST_CLOCK_TIME_NONE; demux->state = GST_ASF_DEMUX_STATE_HEADER; demux->seekable = FALSE; @@ -648,6 +651,11 @@ gst_asf_demux_chain_headers (GstASFDemux * demux) if (demux->num_streams == 0) goto no_streams; + /* FIXME: remove when we activate streams after internal preroll in + * streaming mode as well */ + GST_LOG_OBJECT (demux, "signalling no more pads"); + gst_element_no_more_pads (GST_ELEMENT_CAST (demux)); + g_free (data); return GST_FLOW_OK; @@ -888,7 +896,6 @@ gst_asf_demux_parse_data_object_start (GstASFDemux * demux, guint8 * data) /* process pending stream objects and create pads for those */ gst_asf_demux_process_queued_extended_stream_objects (demux); gst_asf_demux_activate_ext_props_streams (demux); - gst_element_no_more_pads (GST_ELEMENT (demux)); GST_INFO_OBJECT (demux, "Stream has %" G_GUINT64_FORMAT " packets, " "data_offset=%" G_GINT64_FORMAT ", data_size=%" G_GINT64_FORMAT @@ -966,9 +973,127 @@ parse_failed: } } -static void -gst_asf_demux_push_complete_payloads (GstASFDemux * demux) +static gboolean +all_streams_prerolled (GstASFDemux * demux) { + GstClockTime preroll_time; + guint i, num_no_data = 0; + + preroll_time = demux->preroll * GST_MSECOND; + + /* returns TRUE as long as there isn't a stream which (a) has data queued + * and (b) the timestamp of last piece of data queued is < demux->preroll + * AND there is at least one other stream with data queued */ + for (i = 0; i < demux->num_streams; ++i) { + AsfPayload *last_payload; + AsfStream *stream; + guint last_idx; + + stream = &demux->stream[i]; + if (stream->payloads->len == 0) { + ++num_no_data; + GST_LOG_OBJECT (stream->pad, "no data queued"); + continue; + } + + last_idx = stream->payloads->len - 1; + last_payload = &g_array_index (stream->payloads, AsfPayload, last_idx); + + GST_LOG_OBJECT (stream->pad, "checking if %" GST_TIME_FORMAT " > %" + GST_TIME_FORMAT, GST_TIME_ARGS (last_payload->ts), + GST_TIME_ARGS (preroll_time)); + if (last_payload->ts <= preroll_time) { + GST_LOG_OBJECT (stream->pad, "not beyond preroll point yet"); + return FALSE; + } + } + + if (num_no_data == demux->num_streams) + return FALSE; + + return TRUE; +} + +#if 0 +static gboolean +gst_asf_demux_have_mutually_exclusive_active_stream (GstASFDemux * demux, + AsfStream * stream) +{ + GSList *l; + + for (l = demux->mut_ex_streams; l != NULL; l = l->next) { + guint8 *mes; + + /* check for each mutual exclusion group whether it affects this stream */ + for (mes = (guint8 *) l->data; mes != NULL && *mes != 0xff; ++mes) { + if (*mes == stream->id) { + /* we are in this group; let's check if we've already activated streams + * that are in the same group (and hence mutually exclusive to this + * one) */ + for (mes = (guint8 *) l->data; mes != NULL && *mes != 0xff; ++mes) { + guint i; + + for (i = 0; i < demux->num_streams; ++i) { + if (demux->stream[i].id == *mes && demux->stream[i].active) { + GST_LOG_OBJECT (demux, "stream with ID %d is mutually exclusive " + "to already active stream with ID %d", stream->id, + demux->stream[i].id); + return TRUE; + } + } + } + /* we can only be in this group once, let's break out and move on to + * the next mutual exclusion group */ + break; + } + } + } + + return FALSE; +} +#endif + +static gboolean +gst_asf_demux_check_activate_streams (GstASFDemux * demux, gboolean force) +{ + guint i; + + if (demux->activated_streams) + return TRUE; + + if (!all_streams_prerolled (demux) && !force) { + GST_DEBUG_OBJECT (demux, "not all streams with data beyond preroll yet"); + return FALSE; + } + + for (i = 0; i < demux->num_streams; ++i) { + AsfStream *stream = &demux->stream[i]; + + if (stream->payloads->len > 0) { + /* we don't check mutual exclusion stuff here; either we have data for + * a stream, then we active it, or we don't, then we'll ignore it */ + GST_LOG_OBJECT (stream->pad, "is prerolled - activate!"); + gst_asf_demux_activate_stream (demux, stream); + } else { + GST_LOG_OBJECT (stream->pad, "no data, ignoring stream"); + } + } + + demux->activated_streams = TRUE; + GST_LOG_OBJECT (demux, "signalling no more pads"); + gst_element_no_more_pads (GST_ELEMENT (demux)); + return TRUE; +} + +/* returns the stream that has a complete payload with the lowest timestamp + * queued, or NULL (we push things by timestamp because during the internal + * prerolling we might accumulate more data then the external queues can take, + * so we'd lock up if we pushed all accumulated data for stream N in one go) */ +static AsfStream * +gst_asf_demux_find_stream_with_complete_payload (GstASFDemux * demux) +{ + AsfPayload *best_payload = NULL; + AsfStream *best_stream = NULL; guint i; for (i = 0; i < demux->num_streams; ++i) { @@ -996,57 +1121,104 @@ gst_asf_demux_push_complete_payloads (GstASFDemux * demux) } } - while (stream->payloads->len > 0) { + /* Now see if there's a complete payload queued for this stream */ + if (stream->payloads->len > 0) { AsfPayload *payload; payload = &g_array_index (stream->payloads, AsfPayload, 0); if (!gst_asf_payload_is_complete (payload)) - break; + continue; - /* Do we have tags pending for this stream? */ - if (stream->pending_tags) { - GST_LOG_OBJECT (stream->pad, "%" GST_PTR_FORMAT, stream->pending_tags); - gst_element_found_tags_for_pad (GST_ELEMENT (demux), stream->pad, - stream->pending_tags); - stream->pending_tags = NULL; + /* ... and whether its timestamp is lower than the current best */ + if (best_stream == NULL || best_payload->ts > payload->ts) { + best_stream = stream; + best_payload = payload; } - - /* We have the whole packet now so we should push the packet to - * the src pad now. First though we should check if we need to do - * descrambling */ - if (demux->span > 1) { - gst_asf_demux_descramble_buffer (demux, stream, &payload->buf); - } - - payload->buf = gst_buffer_make_metadata_writable (payload->buf); - - if (!payload->keyframe) { - GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DELTA_UNIT); - } - - if (stream->discont) { - GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DISCONT); - stream->discont = FALSE; - } - - gst_buffer_set_caps (payload->buf, stream->caps); - - GST_BUFFER_TIMESTAMP (payload->buf) = payload->ts; - GST_BUFFER_DURATION (payload->buf) = payload->duration; - - /* FIXME: we should really set durations on buffers if we can */ - - GST_LOG_OBJECT (stream->pad, "pushing buffer, ts=%" GST_TIME_FORMAT - ", dur=%" GST_TIME_FORMAT " size=%u", - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (payload->buf)), - GST_TIME_ARGS (GST_BUFFER_DURATION (payload->buf)), - GST_BUFFER_SIZE (payload->buf)); - - stream->last_flow = gst_pad_push (stream->pad, payload->buf); - payload->buf = NULL; - g_array_remove_index (stream->payloads, 0); } } + + return best_stream; +} + +static void +gst_asf_demux_push_complete_payloads (GstASFDemux * demux, gboolean force) +{ + AsfStream *stream; + + if (G_UNLIKELY (!demux->activated_streams)) { + if (!gst_asf_demux_check_activate_streams (demux, force)) + return; + /* streams are now activated */ + } + + /* do we need to send a newsegment event */ + if (demux->need_newsegment) { + if (demux->segment.stop == GST_CLOCK_TIME_NONE && + demux->segment.duration > 0) { + demux->segment.stop = demux->segment.duration; + } + + GST_DEBUG_OBJECT (demux, "sending new-segment event %" GST_SEGMENT_FORMAT, + &demux->segment); + + /* note: we fix up all timestamps to start from 0, so this should be ok */ + gst_asf_demux_send_event_unlocked (demux, + gst_event_new_new_segment (FALSE, demux->segment.rate, + GST_FORMAT_TIME, demux->segment.start, demux->segment.stop, + demux->segment.start)); + + demux->need_newsegment = FALSE; + demux->segment_running = TRUE; + } + + while ((stream = gst_asf_demux_find_stream_with_complete_payload (demux))) { + AsfPayload *payload; + + payload = &g_array_index (stream->payloads, AsfPayload, 0); + + /* Do we have tags pending for this stream? */ + if (stream->pending_tags) { + GST_LOG_OBJECT (stream->pad, "%" GST_PTR_FORMAT, stream->pending_tags); + gst_element_found_tags_for_pad (GST_ELEMENT (demux), stream->pad, + stream->pending_tags); + stream->pending_tags = NULL; + } + + /* We have the whole packet now so we should push the packet to + * the src pad now. First though we should check if we need to do + * descrambling */ + if (demux->span > 1) { + gst_asf_demux_descramble_buffer (demux, stream, &payload->buf); + } + + payload->buf = gst_buffer_make_metadata_writable (payload->buf); + + if (!payload->keyframe) { + GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DELTA_UNIT); + } + + if (stream->discont) { + GST_BUFFER_FLAG_SET (payload->buf, GST_BUFFER_FLAG_DISCONT); + stream->discont = FALSE; + } + + gst_buffer_set_caps (payload->buf, stream->caps); + + GST_BUFFER_TIMESTAMP (payload->buf) = payload->ts; + GST_BUFFER_DURATION (payload->buf) = payload->duration; + + /* FIXME: we should really set durations on buffers if we can */ + + GST_LOG_OBJECT (stream->pad, "pushing buffer, ts=%" GST_TIME_FORMAT + ", dur=%" GST_TIME_FORMAT " size=%u", + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (payload->buf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (payload->buf)), + GST_BUFFER_SIZE (payload->buf)); + + stream->last_flow = gst_pad_push (stream->pad, payload->buf); + payload->buf = NULL; + g_array_remove_index (stream->payloads, 0); + } } static void @@ -1083,23 +1255,6 @@ gst_asf_demux_loop (GstASFDemux * demux) goto read_failed; } - if (demux->need_newsegment) { - if (demux->segment.stop == GST_CLOCK_TIME_NONE) - demux->segment.stop = demux->segment.duration; - - GST_DEBUG_OBJECT (demux, "sending new-segment event %" GST_SEGMENT_FORMAT, - &demux->segment); - - /* FIXME: check last parameter, streams may have non-zero start */ - gst_asf_demux_send_event_unlocked (demux, - gst_event_new_new_segment (FALSE, demux->segment.rate, - GST_FORMAT_TIME, demux->segment.start, demux->segment.stop, - demux->segment.start)); - - demux->need_newsegment = FALSE; - demux->segment_running = TRUE; - } - /* FIXME: maybe we should just skip broken packets and error out only * after a few broken packets in a row? */ if (!gst_asf_demux_parse_packet (demux, buf)) @@ -1107,7 +1262,7 @@ gst_asf_demux_loop (GstASFDemux * demux) gst_buffer_unref (buf); - gst_asf_demux_push_complete_payloads (demux); + gst_asf_demux_push_complete_payloads (demux, FALSE); ++demux->packet; @@ -1128,6 +1283,12 @@ gst_asf_demux_loop (GstASFDemux * demux) eos: { + /* if we haven't actived our streams yet, this might be because we have + * less data queued than required for preroll; force stream activation and + * send any pending payloads before sending EOS */ + if (!demux->activated_streams) + gst_asf_demux_push_complete_payloads (demux, TRUE); + if (demux->segment.flags & GST_SEEK_FLAG_SEGMENT) { gint64 stop; @@ -2646,8 +2807,9 @@ gst_asf_demux_activate_ext_props_streams (GstASFDemux * demux) next: - /* FIXME: we should do stream activation based on preroll data */ - if (!is_hidden) + /* FIXME: we should do stream activation based on preroll data in + * streaming mode too */ + if (demux->streaming && !is_hidden) gst_asf_demux_activate_stream (demux, stream); } } @@ -2681,8 +2843,12 @@ gst_asf_demux_process_object (GstASFDemux * demux, guint8 ** p_data, stream = gst_asf_demux_parse_stream_object (demux, *p_data, obj_data_size); - if (stream) + + /* FIXME: we should do stream activation based on preroll data in + * streaming mode too */ + if (demux->streaming && stream != NULL) gst_asf_demux_activate_stream (demux, stream); + ret = GST_FLOW_OK; break; } diff --git a/gst/asfdemux/gstasfdemux.h b/gst/asfdemux/gstasfdemux.h index a1139334dd..413b6e9494 100644 --- a/gst/asfdemux/gstasfdemux.h +++ b/gst/asfdemux/gstasfdemux.h @@ -149,6 +149,7 @@ struct _GstASFDemux { guint32 num_video_streams; guint32 num_streams; AsfStream stream[GST_ASF_DEMUX_NUM_STREAMS]; + gboolean activated_streams; GstClockTime first_ts; /* first timestamp found */