streamsynchronizer: Handle stream switching

* Update outgoing segment.base with accumulated time, ensuring all
  streams are synchronized.
* Only consider streams as "new" is they have a STREAM_START event
  with a different seqnum.
* Use GstStream segment.base instead of separate variable to store
  the past running time.
* Disable passthrough
* Switch to glib 2.32 GMutex/GCond
* Avoid getting pad parent the expensive way
* Minor other fixes
This commit is contained in:
Edward Hervey 2012-08-14 18:53:52 +02:00
parent 784ca61ced
commit d86f6132ed
2 changed files with 69 additions and 136 deletions

View file

@ -22,7 +22,6 @@
#endif
#include "gststreamsynchronizer.h"
#include "gst/glib-compat-private.h"
GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
#define GST_CAT_DEFAULT stream_synchronizer_debug
@ -31,7 +30,7 @@ GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
GST_LOG_OBJECT (obj, \
"locking from thread %p", \
g_thread_self ()); \
g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
GST_LOG_OBJECT (obj, \
"locked from thread %p", \
g_thread_self ()); \
@ -41,7 +40,7 @@ GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
GST_LOG_OBJECT (obj, \
"unlocking from thread %p", \
g_thread_self ()); \
g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
} G_STMT_END
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
@ -53,8 +52,6 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static const gboolean passthrough = FALSE;
#define gst_stream_synchronizer_parent_class parent_class
G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
GST_TYPE_ELEMENT);
@ -67,42 +64,38 @@ typedef struct
GstPad *sinkpad;
GstSegment segment;
gboolean wait;
gboolean wait; /* TRUE if waiting/blocking */
gboolean new_stream;
gboolean drop_discont;
gboolean is_eos;
gboolean is_eos; /* TRUE if EOS was received */
gboolean seen_data;
gint64 running_time_diff;
GCond stream_finish_cond;
GCond *stream_finish_cond;
/* seqnum of the previously received STREAM_START
* default: G_MAXUINT32 */
guint32 stream_start_seqnum;
guint32 segment_seqnum;
} GstStream;
/* Must be called with lock! */
static GstPad *
static inline GstPad *
gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
{
if (stream->sinkpad == pad)
return gst_object_ref (stream->srcpad);
else if (stream->srcpad == pad)
if (stream->srcpad == pad)
return gst_object_ref (stream->sinkpad);
return NULL;
}
static GstPad *
gst_stream_get_other_pad_from_pad (GstPad * pad)
gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
{
GstObject *parent = gst_pad_get_parent (pad);
GstStreamSynchronizer *self;
GstStream *stream;
GstPad *opad = NULL;
/* released pad does not have parent anymore */
if (!G_LIKELY (parent))
goto exit;
self = GST_STREAM_SYNCHRONIZER (parent);
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (!stream)
@ -112,9 +105,7 @@ gst_stream_get_other_pad_from_pad (GstPad * pad)
out:
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
gst_object_unref (self);
exit:
if (!opad)
GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
@ -129,7 +120,8 @@ gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
GstIterator *it = NULL;
GstPad *opad;
opad = gst_stream_get_other_pad_from_pad (pad);
opad =
gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
if (opad) {
GValue value = { 0, };
@ -152,7 +144,8 @@ gst_stream_synchronizer_query (GstPad * pad, GstObject * parent,
GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
opad = gst_stream_get_other_pad_from_pad (pad);
opad =
gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
if (opad) {
ret = gst_pad_peer_query (opad, query);
gst_object_unref (opad);
@ -170,9 +163,6 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
GstPad *opad;
gboolean ret = FALSE;
if (passthrough)
goto skip_adjustments;
GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event);
@ -181,7 +171,7 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
gdouble proportion;
GstClockTimeDiff diff;
GstClockTime timestamp;
gint64 running_time_diff;
gint64 running_time_diff = -1;
GstStream *stream;
gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
@ -190,15 +180,14 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (stream)
running_time_diff = stream->running_time_diff;
else
running_time_diff = -1;
running_time_diff = stream->segment.base;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
if (running_time_diff == -1) {
GST_WARNING_OBJECT (pad, "QOS event before group start");
goto out;
} else if (timestamp < running_time_diff) {
}
if (timestamp < running_time_diff) {
GST_DEBUG_OBJECT (pad, "QOS event from previous group");
goto out;
}
@ -227,9 +216,7 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
break;
}
skip_adjustments:
opad = gst_stream_get_other_pad_from_pad (pad);
opad = gst_stream_get_other_pad_from_pad (self, pad);
if (opad) {
ret = gst_pad_push_event (opad, event);
gst_object_unref (opad);
@ -248,9 +235,6 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
GstPad *opad;
gboolean ret = FALSE;
if (passthrough)
goto skip_adjustments;
GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event);
@ -258,12 +242,13 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
case GST_EVENT_STREAM_START:
{
GstStream *stream;
guint32 seqnum = gst_event_get_seqnum (event);
GList *l;
gboolean all_wait = TRUE;
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (stream) {
GList *l;
gboolean all_wait = TRUE;
if (stream && stream->stream_start_seqnum != seqnum) {
GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
@ -310,10 +295,11 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
g_cond_broadcast (ostream->stream_finish_cond);
g_cond_broadcast (&ostream->stream_finish_cond);
}
}
}
} else
GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
}
break;
@ -328,7 +314,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
if (stream) {
if (stream->wait) {
GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
g_cond_wait (stream->stream_finish_cond, self->lock);
g_cond_wait (&stream->stream_finish_cond, &self->lock);
stream = gst_pad_get_element_private (pad);
if (stream)
stream->wait = FALSE;
@ -343,42 +329,9 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
if (stream && segment.format == GST_FORMAT_TIME) {
if (stream->new_stream) {
gint64 position_running_time = 0;
gint64 stop_running_time = 0;
if (stream->segment.format == GST_FORMAT_TIME) {
position_running_time =
gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
stream->segment.position);
position_running_time = MAX (position_running_time, 0);
stop_running_time =
gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
stream->segment.stop);
stop_running_time = MAX (position_running_time, 0);
if (stop_running_time != position_running_time) {
GST_WARNING_OBJECT (pad,
"Gap between position and segment stop: %" GST_TIME_FORMAT
" != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
GST_TIME_ARGS (position_running_time));
}
if (stop_running_time < position_running_time) {
GST_DEBUG_OBJECT (pad, "Updating stop position");
stream->segment.stop = stream->segment.position;
gst_pad_push_event (stream->srcpad,
gst_event_new_segment (&stream->segment));
}
stop_running_time = MAX (stop_running_time, position_running_time);
GST_DEBUG_OBJECT (pad,
"Stop running time of last group: %" GST_TIME_FORMAT,
GST_TIME_ARGS (stop_running_time));
}
stream->new_stream = FALSE;
stream->drop_discont = TRUE;
segment.base = stop_running_time;
segment.base = self->group_start_time;
}
GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
@ -386,14 +339,15 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
gst_segment_copy_into (&segment, &stream->segment);
GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
&stream->segment);
stream->segment_seqnum = gst_event_get_seqnum (event);
GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->segment.base));
stream->running_time_diff = stream->segment.base;
{
GstEvent *tmpev;
tmpev = gst_event_new_segment (&stream->segment);
gst_event_set_seqnum (tmpev, stream->segment_seqnum);
gst_event_unref (event);
event = tmpev;
}
@ -413,7 +367,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
stream = gst_pad_get_element_private (pad);
if (stream) {
GST_DEBUG_OBJECT (pad, "Flushing streams");
g_cond_broadcast (stream->stream_finish_cond);
g_cond_broadcast (&stream->stream_finish_cond);
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
@ -509,9 +463,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
break;
}
skip_adjustments:
opad = gst_stream_get_other_pad_from_pad (pad);
opad = gst_stream_get_other_pad_from_pad (self, pad);
if (opad) {
ret = gst_pad_push_event (opad, event);
gst_object_unref (opad);
@ -533,15 +485,6 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
GstClockTime timestamp = GST_CLOCK_TIME_NONE;
GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
if (passthrough) {
opad = gst_stream_get_other_pad_from_pad (pad);
if (opad) {
ret = gst_pad_push (opad, buffer);
gst_object_unref (opad);
}
goto done;
}
GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
" offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
@ -558,24 +501,25 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (stream)
if (stream) {
stream->seen_data = TRUE;
if (stream && stream->drop_discont) {
buffer = gst_buffer_make_writable (buffer);
GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
stream->drop_discont = FALSE;
}
if (stream->drop_discont) {
buffer = gst_buffer_make_writable (buffer);
GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
stream->drop_discont = FALSE;
}
if (stream && stream->segment.format == GST_FORMAT_TIME
&& GST_CLOCK_TIME_IS_VALID (timestamp)) {
GST_LOG_OBJECT (pad,
"Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
stream->segment.position = timestamp;
if (stream->segment.format == GST_FORMAT_TIME
&& GST_CLOCK_TIME_IS_VALID (timestamp)) {
GST_LOG_OBJECT (pad,
"Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
stream->segment.position = timestamp;
}
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
opad = gst_stream_get_other_pad_from_pad (pad);
opad = gst_stream_get_other_pad_from_pad (self, pad);
if (opad) {
ret = gst_pad_push (opad, buffer);
gst_object_unref (opad);
@ -590,7 +534,7 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
if (stream && stream->segment.format == GST_FORMAT_TIME
&& GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
GST_LOG_OBJECT (pad,
"Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
"Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->segment.position),
GST_TIME_ARGS (timestamp_end));
stream->segment.position = timestamp_end;
@ -637,7 +581,6 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
}
done:
return ret;
}
@ -657,7 +600,9 @@ gst_stream_synchronizer_request_new_pad (GstElement * element,
stream = g_slice_new0 (GstStream);
stream->transform = self;
stream->stream_number = self->current_stream_number;
stream->stream_finish_cond = g_cond_new ();
g_cond_init (&stream->stream_finish_cond);
stream->stream_start_seqnum = G_MAXUINT32;
stream->segment_seqnum = G_MAXUINT32;
tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
@ -743,14 +688,17 @@ gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
stream->segment.position);
stop_running_time = MAX (stop_running_time, position_running_time);
GST_DEBUG_OBJECT (stream->sinkpad,
"Stop running time was: %" GST_TIME_FORMAT,
GST_TIME_ARGS (stop_running_time));
if (stop_running_time > self->group_start_time) {
GST_DEBUG_OBJECT (stream->sinkpad,
"Updating global start running time from %" GST_TIME_FORMAT " to %"
GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
GST_TIME_ARGS (stop_running_time));
self->group_start_time = MAX (self->group_start_time, stop_running_time);
self->group_start_time = stop_running_time;
}
}
g_cond_free (stream->stream_finish_cond);
g_cond_clear (&stream->stream_finish_cond);
g_slice_free (GstStream, stream);
/* NOTE: In theory we have to check here if all streams
@ -787,7 +735,7 @@ gst_stream_synchronizer_change_state (GstElement * element,
GstStateChange transition)
{
GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
GstStateChangeReturn ret;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
@ -799,9 +747,6 @@ gst_stream_synchronizer_change_state (GstElement * element,
self->group_start_time = 0;
self->shutdown = FALSE;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:{
GList *l;
@ -810,7 +755,7 @@ gst_stream_synchronizer_change_state (GstElement * element,
GST_STREAM_SYNCHRONIZER_LOCK (self);
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
g_cond_broadcast (ostream->stream_finish_cond);
g_cond_broadcast (&ostream->stream_finish_cond);
}
self->shutdown = TRUE;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
@ -819,19 +764,12 @@ gst_stream_synchronizer_change_state (GstElement * element,
break;
}
{
GstStateChangeReturn bret;
bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
return ret;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret);
if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS))
return ret;
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:{
GList *l;
@ -855,8 +793,6 @@ gst_stream_synchronizer_change_state (GstElement * element,
GST_DEBUG_OBJECT (self, "State change READY->NULL");
GST_STREAM_SYNCHRONIZER_LOCK (self);
while (self->streams)
gst_stream_synchronizer_release_stream (self, self->streams->data);
self->current_stream_number = 0;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
@ -874,10 +810,7 @@ gst_stream_synchronizer_finalize (GObject * object)
{
GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
if (self->lock) {
g_mutex_free (self->lock);
self->lock = NULL;
}
g_mutex_clear (&self->lock);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -886,7 +819,7 @@ gst_stream_synchronizer_finalize (GObject * object)
static void
gst_stream_synchronizer_init (GstStreamSynchronizer * self)
{
self->lock = g_mutex_new ();
g_mutex_init (&self->lock);
}
static void

View file

@ -45,7 +45,7 @@ struct _GstStreamSynchronizer
GstElement parent;
/* < private > */
GMutex *lock;
GMutex lock;
gboolean shutdown;
GList *streams;