From 2465895266b17d4a1c438d73f84e68d9252ec578 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 22 Jul 2013 13:15:09 +0200 Subject: [PATCH] streamsynchronizer: Implement grouping of streams via the group-id https://bugzilla.gnome.org/show_bug.cgi?id=704427 https://bugzilla.gnome.org/show_bug.cgi?id=704408 --- gst/playback/gststreamsynchronizer.c | 151 +++++++++++++++++---------- gst/playback/gststreamsynchronizer.h | 3 + 2 files changed, 99 insertions(+), 55 deletions(-) diff --git a/gst/playback/gststreamsynchronizer.c b/gst/playback/gststreamsynchronizer.c index a5eb53037f..69f163f5e6 100644 --- a/gst/playback/gststreamsynchronizer.c +++ b/gst/playback/gststreamsynchronizer.c @@ -76,6 +76,7 @@ typedef struct * default: G_MAXUINT32 */ guint32 stream_start_seqnum; guint32 segment_seqnum; + guint group_id; } GstStream; /* Must be called with lock! */ @@ -243,86 +244,119 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, { GstStream *stream, *ostream; guint32 seqnum = gst_event_get_seqnum (event); + guint group_id; + gboolean have_group_id; GList *l; gboolean all_wait = TRUE; gboolean new_stream = TRUE; + have_group_id = gst_event_parse_group_id (event, &group_id); + GST_STREAM_SYNCHRONIZER_LOCK (self); + self->have_group_id &= have_group_id; + have_group_id = self->have_group_id; + stream = gst_pad_get_element_private (pad); - if (stream && stream->stream_start_seqnum != seqnum) { + + if (!stream) { + GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source"); + GST_STREAM_SYNCHRONIZER_UNLOCK (self); + break; + } + + if ((have_group_id && stream->group_id != group_id) || (!have_group_id + && stream->stream_start_seqnum != seqnum)) { stream->is_eos = FALSE; stream->stream_start_seqnum = seqnum; + stream->group_id = group_id; stream->drop_discont = TRUE; - /* Check if this belongs to a stream that is already there, - * e.g. we got the visualizations for an audio stream */ - for (l = self->streams; l; l = l->next) { - ostream = l->data; + if (!have_group_id) { + /* Check if this belongs to a stream that is already there, + * e.g. we got the visualizations for an audio stream */ + for (l = self->streams; l; l = l->next) { + ostream = l->data; - if (ostream != stream && ostream->stream_start_seqnum == seqnum - && !ostream->wait) { - new_stream = FALSE; + if (ostream != stream && ostream->stream_start_seqnum == seqnum + && !ostream->wait) { + new_stream = FALSE; + break; + } + } + + if (!new_stream) { + GST_DEBUG_OBJECT (pad, + "Stream %d belongs to running stream %d, no waiting", + stream->stream_number, ostream->stream_number); + stream->wait = FALSE; + stream->new_stream = FALSE; + + GST_STREAM_SYNCHRONIZER_UNLOCK (self); break; } + } else if (group_id == self->group_id) { + GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, " + "no waiting", stream->stream_number, group_id); + GST_STREAM_SYNCHRONIZER_UNLOCK (self); + break; } - if (!new_stream) { - GST_DEBUG_OBJECT (pad, - "Stream %d belongs to running stream %d, no waiting", - stream->stream_number, ostream->stream_number); - stream->wait = FALSE; - stream->new_stream = FALSE; - } else { - GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number); + GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number); - stream->wait = TRUE; - stream->new_stream = TRUE; + stream->wait = TRUE; + stream->new_stream = TRUE; + + for (l = self->streams; l; l = l->next) { + GstStream *ostream = l->data; + + all_wait = all_wait && ostream->wait && (!have_group_id + || ostream->group_id == group_id); + if (!all_wait) + break; + } + + if (all_wait) { + gint64 position = 0; + + if (have_group_id) + GST_DEBUG_OBJECT (self, + "All streams have changed to group id %u -- unblocking", + group_id); + else + GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking"); + + self->group_id = group_id; for (l = self->streams; l; l = l->next) { GstStream *ostream = l->data; + gint64 stop_running_time; + gint64 position_running_time; - all_wait = all_wait && ostream->wait; - if (!all_wait) - break; + ostream->wait = FALSE; + + if (ostream->segment.format == GST_FORMAT_TIME) { + stop_running_time = + gst_segment_to_running_time (&ostream->segment, + GST_FORMAT_TIME, ostream->segment.stop); + position_running_time = + gst_segment_to_running_time (&ostream->segment, + GST_FORMAT_TIME, ostream->segment.position); + position = + MAX (position, MAX (stop_running_time, + position_running_time)); + } } - if (all_wait) { - gint64 position = 0; + position = MAX (0, position); + self->group_start_time = MAX (self->group_start_time, position); - GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking"); + GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT, + GST_TIME_ARGS (self->group_start_time)); - for (l = self->streams; l; l = l->next) { - GstStream *ostream = l->data; - gint64 stop_running_time; - gint64 position_running_time; - - ostream->wait = FALSE; - - if (ostream->segment.format == GST_FORMAT_TIME) { - stop_running_time = - gst_segment_to_running_time (&ostream->segment, - GST_FORMAT_TIME, ostream->segment.stop); - position_running_time = - gst_segment_to_running_time (&ostream->segment, - GST_FORMAT_TIME, ostream->segment.position); - position = - MAX (position, MAX (stop_running_time, - position_running_time)); - } - } - position = MAX (0, position); - self->group_start_time = MAX (self->group_start_time, position); - - GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT, - GST_TIME_ARGS (self->group_start_time)); - - for (l = self->streams; l; l = l->next) { - GstStream *ostream = l->data; - g_cond_broadcast (&ostream->stream_finish_cond); - } + for (l = self->streams; l; l = l->next) { + GstStream *ostream = l->data; + g_cond_broadcast (&ostream->stream_finish_cond); } } - } else { - GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source"); } GST_STREAM_SYNCHRONIZER_UNLOCK (self); @@ -657,6 +691,7 @@ gst_stream_synchronizer_request_new_pad (GstElement * element, g_cond_init (&stream->stream_finish_cond); stream->stream_start_seqnum = G_MAXUINT32; stream->segment_seqnum = G_MAXUINT32; + stream->group_id = G_MAXUINT; tmp = g_strdup_printf ("sink_%u", self->current_stream_number); stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp); @@ -717,6 +752,10 @@ gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self, } } g_assert (l != NULL); + if (self->streams == NULL) { + self->have_group_id = TRUE; + self->group_id = G_MAXUINT; + } /* we can drop the lock, since stream exists now only local. * Moreover, we should drop, to prevent deadlock with STREAM_LOCK @@ -799,6 +838,8 @@ gst_stream_synchronizer_change_state (GstElement * element, case GST_STATE_CHANGE_READY_TO_PAUSED: GST_DEBUG_OBJECT (self, "State change READY->PAUSED"); self->group_start_time = 0; + self->have_group_id = TRUE; + self->group_id = G_MAXUINT; self->shutdown = FALSE; break; case GST_STATE_CHANGE_PAUSED_TO_READY:{ diff --git a/gst/playback/gststreamsynchronizer.h b/gst/playback/gststreamsynchronizer.h index b9993122b6..2f5a3ecded 100644 --- a/gst/playback/gststreamsynchronizer.h +++ b/gst/playback/gststreamsynchronizer.h @@ -52,6 +52,9 @@ struct _GstStreamSynchronizer guint current_stream_number; GstClockTime group_start_time; + + gboolean have_group_id; + guint group_id; }; struct _GstStreamSynchronizerClass