diff --git a/subprojects/gst-plugins-base/gst/playback/gststreamsynchronizer.c b/subprojects/gst-plugins-base/gst/playback/gststreamsynchronizer.c index 481579ba63..b3e964f47b 100644 --- a/subprojects/gst-plugins-base/gst/playback/gststreamsynchronizer.c +++ b/subprojects/gst-plugins-base/gst/playback/gststreamsynchronizer.c @@ -361,6 +361,370 @@ gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad) return TRUE; } +static void +gst_stream_synchronizer_handle_stream_start (GstStreamSynchronizer * self, + GstPad * pad, GstEvent * event) +{ + GstSyncStream *stream, *ostream; + guint32 seqnum = gst_event_get_seqnum (event); + guint group_id; + gboolean have_group_id; + GList *l; + gboolean all_wait = TRUE; + gboolean new_stream = TRUE; + + have_group_id = gst_event_parse_group_id (event, &group_id); + + GST_STREAM_SYNCHRONIZER_LOCK (self); + self->have_group_id &= have_group_id; + have_group_id = self->have_group_id; + self->eos = FALSE; + + stream = gst_streamsync_pad_get_stream (pad); + + gst_event_parse_stream_flags (event, &stream->flags); + + if ((have_group_id && stream->group_id != group_id) || (!have_group_id + && stream->stream_start_seqnum != seqnum)) { + stream->is_eos = FALSE; + stream->eos_sent = FALSE; + stream->flushing = FALSE; + stream->stream_start_seqnum = seqnum; + stream->group_id = group_id; + + if (!have_group_id) { + /* Check if this belongs to a stream that is already there, + * e.g. we got the visualizations for an audio stream */ + for (l = self->streams; l; l = l->next) { + ostream = l->data; + + if (ostream != stream && ostream->stream_start_seqnum == seqnum + && !ostream->wait) { + new_stream = FALSE; + break; + } + } + + if (!new_stream) { + GST_DEBUG_OBJECT (pad, + "Stream %d belongs to running stream %d, no waiting", + stream->stream_number, ostream->stream_number); + stream->wait = FALSE; + gst_syncstream_unref (stream); + GST_STREAM_SYNCHRONIZER_UNLOCK (self); + return; + } + } else if (group_id == self->group_id) { + GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, " + "no waiting", stream->stream_number, group_id); + gst_syncstream_unref (stream); + GST_STREAM_SYNCHRONIZER_UNLOCK (self); + return; + } + + GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number); + + stream->wait = TRUE; + + for (l = self->streams; l; l = l->next) { + GstSyncStream *ostream = l->data; + + all_wait = all_wait && ((ostream->flags & GST_STREAM_FLAG_SPARSE) + || (ostream->wait && (!have_group_id + || ostream->group_id == group_id))); + if (!all_wait) + break; + } + + if (all_wait) { + gint64 position = 0; + + if (have_group_id) + GST_DEBUG_OBJECT (self, + "All streams have changed to group id %u -- unblocking", group_id); + else + GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking"); + + self->group_id = group_id; + + for (l = self->streams; l; l = l->next) { + GstSyncStream *ostream = l->data; + gint64 stop_running_time; + gint64 position_running_time; + + ostream->wait = FALSE; + + if (ostream->segment.format == GST_FORMAT_TIME) { + if (ostream->segment.rate > 0) + stop_running_time = + gst_segment_to_running_time (&ostream->segment, + GST_FORMAT_TIME, ostream->segment.stop); + else + stop_running_time = + gst_segment_to_running_time (&ostream->segment, + GST_FORMAT_TIME, ostream->segment.start); + + position_running_time = + gst_segment_to_running_time (&ostream->segment, + GST_FORMAT_TIME, ostream->segment.position); + + position_running_time = + MAX (position_running_time, stop_running_time); + + if (ostream->segment.rate > 0) + position_running_time -= + gst_segment_to_running_time (&ostream->segment, + GST_FORMAT_TIME, ostream->segment.start); + else + position_running_time -= + gst_segment_to_running_time (&ostream->segment, + GST_FORMAT_TIME, ostream->segment.stop); + + position_running_time = MAX (0, position_running_time); + + position = MAX (position, position_running_time); + } + } + + self->group_start_time += position; + + GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT, + GST_TIME_ARGS (self->group_start_time)); + + for (l = self->streams; l; l = l->next) { + GstSyncStream *ostream = l->data; + ostream->wait = FALSE; + g_cond_broadcast (&ostream->stream_finish_cond); + } + } + } + + gst_syncstream_unref (stream); + GST_STREAM_SYNCHRONIZER_UNLOCK (self); + +} + +/* Returns FALSE if the event was handled and shouldn't be propagated */ +static gboolean +gst_stream_synchronizer_handle_segment (GstStreamSynchronizer * self, + GstPad * pad, GstEvent ** event) +{ + GstSyncStream *stream; + GstSegment segment; + + gst_event_copy_segment (*event, &segment); + + GST_STREAM_SYNCHRONIZER_LOCK (self); + + gst_stream_synchronizer_wait (self, pad); + + if (self->shutdown) { + GST_STREAM_SYNCHRONIZER_UNLOCK (self); + gst_event_unref (*event); + return FALSE; + } + + stream = gst_streamsync_pad_get_stream (pad); + if (segment.format == GST_FORMAT_TIME) { + GST_DEBUG_OBJECT (pad, + "New stream, updating base from %" GST_TIME_FORMAT " to %" + GST_TIME_FORMAT, GST_TIME_ARGS (segment.base), + GST_TIME_ARGS (segment.base + self->group_start_time)); + segment.base += self->group_start_time; + + GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT, + &stream->segment); + gst_segment_copy_into (&segment, &stream->segment); + GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT, + &stream->segment); + stream->segment_seqnum = gst_event_get_seqnum (*event); + + GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT, + GST_TIME_ARGS (stream->segment.base)); + { + GstEvent *tmpev; + + tmpev = gst_event_new_segment (&stream->segment); + gst_event_set_seqnum (tmpev, stream->segment_seqnum); + gst_event_unref (*event); + *event = tmpev; + } + } else if (stream) { + GST_WARNING_OBJECT (pad, "Non-TIME segment: %s", + gst_format_get_name (segment.format)); + gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED); + } + gst_syncstream_unref (stream); + GST_STREAM_SYNCHRONIZER_UNLOCK (self); + + return TRUE; +} + +static void +gst_stream_synchronizer_handle_flush_stop (GstStreamSynchronizer * self, + GstPad * pad, GstEvent * event) +{ + GstSyncStream *stream; + GList *l; + GstClockTime new_group_start_time = 0; + gboolean reset_time; + + gst_event_parse_flush_stop (event, &reset_time); + + GST_STREAM_SYNCHRONIZER_LOCK (self); + + stream = gst_streamsync_pad_get_stream (pad); + + if (reset_time) { + GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d", + stream->stream_number); + gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED); + } + + stream->is_eos = FALSE; + stream->eos_sent = FALSE; + stream->flushing = FALSE; + stream->wait = FALSE; + g_cond_broadcast (&stream->stream_finish_cond); + + if (reset_time) { + for (l = self->streams; l; l = l->next) { + GstSyncStream *ostream = l->data; + GstClockTime start_running_time; + + if (ostream == stream || ostream->flushing) + continue; + + if (ostream->segment.format == GST_FORMAT_TIME) { + if (ostream->segment.rate > 0) + start_running_time = + gst_segment_to_running_time (&ostream->segment, + GST_FORMAT_TIME, ostream->segment.start); + else + start_running_time = + gst_segment_to_running_time (&ostream->segment, + GST_FORMAT_TIME, ostream->segment.stop); + + new_group_start_time = MAX (new_group_start_time, start_running_time); + } + } + + GST_DEBUG_OBJECT (pad, + "Updating group start time from %" GST_TIME_FORMAT " to %" + GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time), + GST_TIME_ARGS (new_group_start_time)); + self->group_start_time = new_group_start_time; + } + + gst_syncstream_unref (stream); + GST_STREAM_SYNCHRONIZER_UNLOCK (self); + +} + +static gboolean +gst_stream_synchronizer_handle_eos (GstStreamSynchronizer * self, GstPad * pad, + GstEvent * event) +{ + gboolean ret = FALSE; + GstSyncStream *stream; + GList *l; + gboolean all_eos = TRUE; + gboolean seen_data; + GSList *pads = NULL; + GstPad *srcpad; + GstClockTime timestamp; + guint32 seqnum; + + GST_STREAM_SYNCHRONIZER_LOCK (self); + stream = gst_streamsync_pad_get_stream (pad); + + GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number); + stream->is_eos = TRUE; + + seen_data = stream->seen_data; + srcpad = gst_object_ref (stream->srcpad); + seqnum = stream->segment_seqnum; + + if (seen_data && stream->segment.position != -1) + timestamp = stream->segment.position; + else if (stream->segment.rate < 0.0 || stream->segment.stop == -1) + timestamp = stream->segment.start; + else + timestamp = stream->segment.stop; + + stream->segment.position = timestamp; + + for (l = self->streams; l; l = l->next) { + GstSyncStream *ostream = l->data; + + all_eos = all_eos && ostream->is_eos; + if (!all_eos) + break; + } + + if (all_eos) { + GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding"); + self->eos = TRUE; + for (l = self->streams; l; l = l->next) { + GstSyncStream *ostream = l->data; + /* local snapshot of current pads */ + gst_object_ref (ostream->srcpad); + pads = g_slist_prepend (pads, ostream->srcpad); + } + } + if (pads) { + GstPad *pad; + GSList *epad; + GstSyncStream *ostream; + + ret = TRUE; + epad = pads; + while (epad) { + pad = epad->data; + ostream = gst_streamsync_pad_get_stream (pad); + g_cond_broadcast (&ostream->stream_finish_cond); + gst_syncstream_unref (ostream); + gst_object_unref (pad); + epad = g_slist_next (epad); + } + g_slist_free (pads); + } else { + if (seen_data) { + stream->send_gap_event = TRUE; + stream->gap_duration = GST_CLOCK_TIME_NONE; + stream->wait = TRUE; + ret = gst_stream_synchronizer_wait (self, srcpad); + } + } + + /* send eos if haven't seen data. seen_data will be true if data buffer + * of the track have received in anytime. sink is ready if seen_data is + * true, so can send GAP event. Will send EOS if sink isn't ready. The + * scenario for the case is one track haven't any media data and then + * send EOS. Or no any valid media data in one track, so decoder can't + * get valid CAPS for the track. sink can't ready without received CAPS.*/ + if (!seen_data || self->eos) { + GstEvent *topush; + GST_DEBUG_OBJECT (pad, "send EOS event"); + /* drop lock when sending eos, which may block in e.g. preroll */ + topush = gst_event_new_eos (); + gst_event_set_seqnum (topush, seqnum); + GST_STREAM_SYNCHRONIZER_UNLOCK (self); + ret = gst_pad_push_event (srcpad, topush); + GST_STREAM_SYNCHRONIZER_LOCK (self); + stream = gst_streamsync_pad_get_stream (pad); + stream->eos_sent = TRUE; + gst_syncstream_unref (stream); + } + + gst_object_unref (srcpad); + gst_event_unref (event); + gst_syncstream_unref (stream); + GST_STREAM_SYNCHRONIZER_UNLOCK (self); + + return ret; +} + /* sinkpad functions */ static gboolean gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, @@ -375,193 +739,12 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, switch (GST_EVENT_TYPE (event)) { case GST_EVENT_STREAM_START: { - GstSyncStream *stream, *ostream; - guint32 seqnum = gst_event_get_seqnum (event); - guint group_id; - gboolean have_group_id; - GList *l; - gboolean all_wait = TRUE; - gboolean new_stream = TRUE; - - have_group_id = gst_event_parse_group_id (event, &group_id); - - GST_STREAM_SYNCHRONIZER_LOCK (self); - self->have_group_id &= have_group_id; - have_group_id = self->have_group_id; - self->eos = FALSE; - - stream = gst_streamsync_pad_get_stream (pad); - - gst_event_parse_stream_flags (event, &stream->flags); - - if ((have_group_id && stream->group_id != group_id) || (!have_group_id - && stream->stream_start_seqnum != seqnum)) { - stream->is_eos = FALSE; - stream->eos_sent = FALSE; - stream->flushing = FALSE; - stream->stream_start_seqnum = seqnum; - stream->group_id = group_id; - - if (!have_group_id) { - /* Check if this belongs to a stream that is already there, - * e.g. we got the visualizations for an audio stream */ - for (l = self->streams; l; l = l->next) { - ostream = l->data; - - if (ostream != stream && ostream->stream_start_seqnum == seqnum - && !ostream->wait) { - new_stream = FALSE; - break; - } - } - - if (!new_stream) { - GST_DEBUG_OBJECT (pad, - "Stream %d belongs to running stream %d, no waiting", - stream->stream_number, ostream->stream_number); - stream->wait = FALSE; - gst_syncstream_unref (stream); - GST_STREAM_SYNCHRONIZER_UNLOCK (self); - break; - } - } else if (group_id == self->group_id) { - GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, " - "no waiting", stream->stream_number, group_id); - gst_syncstream_unref (stream); - GST_STREAM_SYNCHRONIZER_UNLOCK (self); - break; - } - - GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number); - - stream->wait = TRUE; - - for (l = self->streams; l; l = l->next) { - GstSyncStream *ostream = l->data; - - all_wait = all_wait && ((ostream->flags & GST_STREAM_FLAG_SPARSE) - || (ostream->wait && (!have_group_id - || ostream->group_id == group_id))); - if (!all_wait) - break; - } - - if (all_wait) { - gint64 position = 0; - - if (have_group_id) - GST_DEBUG_OBJECT (self, - "All streams have changed to group id %u -- unblocking", - group_id); - else - GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking"); - - self->group_id = group_id; - - for (l = self->streams; l; l = l->next) { - GstSyncStream *ostream = l->data; - gint64 stop_running_time; - gint64 position_running_time; - - ostream->wait = FALSE; - - if (ostream->segment.format == GST_FORMAT_TIME) { - if (ostream->segment.rate > 0) - stop_running_time = - gst_segment_to_running_time (&ostream->segment, - GST_FORMAT_TIME, ostream->segment.stop); - else - stop_running_time = - gst_segment_to_running_time (&ostream->segment, - GST_FORMAT_TIME, ostream->segment.start); - - position_running_time = - gst_segment_to_running_time (&ostream->segment, - GST_FORMAT_TIME, ostream->segment.position); - - position_running_time = - MAX (position_running_time, stop_running_time); - - if (ostream->segment.rate > 0) - position_running_time -= - gst_segment_to_running_time (&ostream->segment, - GST_FORMAT_TIME, ostream->segment.start); - else - position_running_time -= - gst_segment_to_running_time (&ostream->segment, - GST_FORMAT_TIME, ostream->segment.stop); - - position_running_time = MAX (0, position_running_time); - - position = MAX (position, position_running_time); - } - } - - self->group_start_time += position; - - GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT, - GST_TIME_ARGS (self->group_start_time)); - - for (l = self->streams; l; l = l->next) { - GstSyncStream *ostream = l->data; - ostream->wait = FALSE; - g_cond_broadcast (&ostream->stream_finish_cond); - } - } - } - - gst_syncstream_unref (stream); - GST_STREAM_SYNCHRONIZER_UNLOCK (self); + gst_stream_synchronizer_handle_stream_start (self, pad, event); break; } case GST_EVENT_SEGMENT:{ - GstSyncStream *stream; - GstSegment segment; - - gst_event_copy_segment (event, &segment); - - GST_STREAM_SYNCHRONIZER_LOCK (self); - - gst_stream_synchronizer_wait (self, pad); - - if (self->shutdown) { - GST_STREAM_SYNCHRONIZER_UNLOCK (self); - gst_event_unref (event); + if (!gst_stream_synchronizer_handle_segment (self, pad, &event)) goto done; - } - - stream = gst_streamsync_pad_get_stream (pad); - if (segment.format == GST_FORMAT_TIME) { - GST_DEBUG_OBJECT (pad, - "New stream, updating base from %" GST_TIME_FORMAT " to %" - GST_TIME_FORMAT, GST_TIME_ARGS (segment.base), - GST_TIME_ARGS (segment.base + self->group_start_time)); - segment.base += self->group_start_time; - - GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT, - &stream->segment); - gst_segment_copy_into (&segment, &stream->segment); - GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT, - &stream->segment); - stream->segment_seqnum = gst_event_get_seqnum (event); - - GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT, - GST_TIME_ARGS (stream->segment.base)); - { - GstEvent *tmpev; - - tmpev = gst_event_new_segment (&stream->segment); - gst_event_set_seqnum (tmpev, stream->segment_seqnum); - gst_event_unref (event); - event = tmpev; - } - } else if (stream) { - GST_WARNING_OBJECT (pad, "Non-TIME segment: %s", - gst_format_get_name (segment.format)); - gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED); - } - gst_syncstream_unref (stream); - GST_STREAM_SYNCHRONIZER_UNLOCK (self); break; } case GST_EVENT_FLUSH_START:{ @@ -578,61 +761,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, break; } case GST_EVENT_FLUSH_STOP:{ - GstSyncStream *stream; - GList *l; - GstClockTime new_group_start_time = 0; - gboolean reset_time; - - gst_event_parse_flush_stop (event, &reset_time); - - GST_STREAM_SYNCHRONIZER_LOCK (self); - - stream = gst_streamsync_pad_get_stream (pad); - - if (reset_time) { - GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d", - stream->stream_number); - gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED); - } - - stream->is_eos = FALSE; - stream->eos_sent = FALSE; - stream->flushing = FALSE; - stream->wait = FALSE; - g_cond_broadcast (&stream->stream_finish_cond); - - if (reset_time) { - for (l = self->streams; l; l = l->next) { - GstSyncStream *ostream = l->data; - GstClockTime start_running_time; - - if (ostream == stream || ostream->flushing) - continue; - - if (ostream->segment.format == GST_FORMAT_TIME) { - if (ostream->segment.rate > 0) - start_running_time = - gst_segment_to_running_time (&ostream->segment, - GST_FORMAT_TIME, ostream->segment.start); - else - start_running_time = - gst_segment_to_running_time (&ostream->segment, - GST_FORMAT_TIME, ostream->segment.stop); - - new_group_start_time = - MAX (new_group_start_time, start_running_time); - } - } - - GST_DEBUG_OBJECT (pad, - "Updating group start time from %" GST_TIME_FORMAT " to %" - GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time), - GST_TIME_ARGS (new_group_start_time)); - self->group_start_time = new_group_start_time; - } - - gst_syncstream_unref (stream); - GST_STREAM_SYNCHRONIZER_UNLOCK (self); + gst_stream_synchronizer_handle_flush_stop (self, pad, event); break; } /* unblocking EOS wait when track switch. */ @@ -654,101 +783,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, break; } case GST_EVENT_EOS:{ - GstSyncStream *stream; - GList *l; - gboolean all_eos = TRUE; - gboolean seen_data; - GSList *pads = NULL; - GstPad *srcpad; - GstClockTime timestamp; - guint32 seqnum; - - GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_streamsync_pad_get_stream (pad); - - GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number); - stream->is_eos = TRUE; - - seen_data = stream->seen_data; - srcpad = gst_object_ref (stream->srcpad); - seqnum = stream->segment_seqnum; - - if (seen_data && stream->segment.position != -1) - timestamp = stream->segment.position; - else if (stream->segment.rate < 0.0 || stream->segment.stop == -1) - timestamp = stream->segment.start; - else - timestamp = stream->segment.stop; - - stream->segment.position = timestamp; - - for (l = self->streams; l; l = l->next) { - GstSyncStream *ostream = l->data; - - all_eos = all_eos && ostream->is_eos; - if (!all_eos) - break; - } - - if (all_eos) { - GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding"); - self->eos = TRUE; - for (l = self->streams; l; l = l->next) { - GstSyncStream *ostream = l->data; - /* local snapshot of current pads */ - gst_object_ref (ostream->srcpad); - pads = g_slist_prepend (pads, ostream->srcpad); - } - } - if (pads) { - GstPad *pad; - GSList *epad; - GstSyncStream *ostream; - - ret = TRUE; - epad = pads; - while (epad) { - pad = epad->data; - ostream = gst_streamsync_pad_get_stream (pad); - g_cond_broadcast (&ostream->stream_finish_cond); - gst_syncstream_unref (ostream); - gst_object_unref (pad); - epad = g_slist_next (epad); - } - g_slist_free (pads); - } else { - if (seen_data) { - stream->send_gap_event = TRUE; - stream->gap_duration = GST_CLOCK_TIME_NONE; - stream->wait = TRUE; - ret = gst_stream_synchronizer_wait (self, srcpad); - } - } - - /* send eos if haven't seen data. seen_data will be true if data buffer - * of the track have received in anytime. sink is ready if seen_data is - * true, so can send GAP event. Will send EOS if sink isn't ready. The - * scenario for the case is one track haven't any media data and then - * send EOS. Or no any valid media data in one track, so decoder can't - * get valid CAPS for the track. sink can't ready without received CAPS.*/ - if (!seen_data || self->eos) { - GstEvent *topush; - GST_DEBUG_OBJECT (pad, "send EOS event"); - /* drop lock when sending eos, which may block in e.g. preroll */ - topush = gst_event_new_eos (); - gst_event_set_seqnum (topush, seqnum); - GST_STREAM_SYNCHRONIZER_UNLOCK (self); - ret = gst_pad_push_event (srcpad, topush); - GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_streamsync_pad_get_stream (pad); - stream->eos_sent = TRUE; - gst_syncstream_unref (stream); - } - - gst_object_unref (srcpad); - gst_event_unref (event); - gst_syncstream_unref (stream); - GST_STREAM_SYNCHRONIZER_UNLOCK (self); + ret = gst_stream_synchronizer_handle_eos (self, pad, event); goto done; } default: