gstreamer/gst/playback/gststreamsynchronizer.c
Tim-Philipp Müller 8f039997f0 playbin2: disable streamsynchronizer magic for this release
Some things aren't quite right yet and cause problems (0-sized buffers
with PREROLL flag set cause crashes in elements that don't expect those;
getting pipeline back to preroll/playing again when audio/video streams
have different lengths and a seek past the end of one of the stream
happens doesn't always work, etc.). Needs further investigation in the
next cycle.

https://bugzilla.gnome.org/show_bug.cgi?id=633700
https://bugzilla.gnome.org/show_bug.cgi?id=634699
2010-11-17 01:01:03 +00:00

982 lines
31 KiB
C

/* GStreamer
* Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gststreamsynchronizer.h"
GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
#define GST_CAT_DEFAULT stream_synchronizer_debug
#define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START { \
GST_LOG_OBJECT (obj, \
"locking from thread %p", \
g_thread_self ()); \
g_mutex_lock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
GST_LOG_OBJECT (obj, \
"locked from thread %p", \
g_thread_self ()); \
} G_STMT_END
#define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START { \
GST_LOG_OBJECT (obj, \
"unlocking from thread %p", \
g_thread_self ()); \
g_mutex_unlock (GST_STREAM_SYNCHRONIZER_CAST(obj)->lock); \
} G_STMT_END
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%d",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%d",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static const gboolean passthrough = TRUE;
GST_BOILERPLATE (GstStreamSynchronizer, gst_stream_synchronizer,
GstElement, GST_TYPE_ELEMENT);
typedef struct
{
GstStreamSynchronizer *transform;
guint stream_number;
GstPad *srcpad;
GstPad *sinkpad;
GstSegment segment;
gboolean wait;
gboolean new_stream;
gboolean drop_discont;
gboolean is_eos;
gboolean seen_data;
gint64 running_time_diff;
} GstStream;
/* Must be called with lock! */
static GstPad *
gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
{
if (stream->sinkpad == pad)
return gst_object_ref (stream->srcpad);
else if (stream->srcpad == pad)
return gst_object_ref (stream->sinkpad);
return NULL;
}
static GstPad *
gst_stream_get_other_pad_from_pad (GstPad * pad)
{
GstStreamSynchronizer *self =
GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
GstStream *stream;
GstPad *opad = NULL;
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (!stream)
goto out;
opad = gst_stream_get_other_pad (stream, pad);
out:
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
gst_object_unref (self);
if (!opad)
GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
return opad;
}
/* Generic pad functions */
static GstIterator *
gst_stream_synchronizer_iterate_internal_links (GstPad * pad)
{
GstIterator *it = NULL;
GstPad *opad;
opad = gst_stream_get_other_pad_from_pad (pad);
if (opad) {
it = gst_iterator_new_single (GST_TYPE_PAD, opad,
(GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
gst_object_unref (opad);
}
return it;
}
static gboolean
gst_stream_synchronizer_query (GstPad * pad, GstQuery * query)
{
GstPad *opad;
gboolean ret = FALSE;
GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
opad = gst_stream_get_other_pad_from_pad (pad);
if (opad) {
ret = gst_pad_peer_query (opad, query);
gst_object_unref (opad);
}
return ret;
}
static GstCaps *
gst_stream_synchronizer_getcaps (GstPad * pad)
{
GstPad *opad;
GstCaps *ret = NULL;
opad = gst_stream_get_other_pad_from_pad (pad);
if (opad) {
ret = gst_pad_peer_get_caps (opad);
gst_object_unref (opad);
}
if (ret == NULL)
ret = gst_caps_new_any ();
GST_LOG_OBJECT (pad, "Returning caps: %" GST_PTR_FORMAT, ret);
return ret;
}
static gboolean
gst_stream_synchronizer_acceptcaps (GstPad * pad, GstCaps * caps)
{
GstPad *opad;
gboolean ret = FALSE;
opad = gst_stream_get_other_pad_from_pad (pad);
if (opad) {
ret = gst_pad_peer_accept_caps (opad, caps);
gst_object_unref (opad);
}
GST_LOG_OBJECT (pad, "Caps%s accepted: %" GST_PTR_FORMAT, (ret ? "" : " not"),
caps);
return ret;
}
/* srcpad functions */
static gboolean
gst_stream_synchronizer_src_event (GstPad * pad, GstEvent * event)
{
GstStreamSynchronizer *self =
GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
GstPad *opad;
gboolean ret = FALSE;
if (passthrough)
goto skip_adjustments;
GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event->structure);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_QOS:{
gdouble proportion;
GstClockTimeDiff diff;
GstClockTime timestamp;
gint64 running_time_diff;
GstStream *stream;
gst_event_parse_qos (event, &proportion, &diff, &timestamp);
gst_event_unref (event);
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (stream)
running_time_diff = stream->running_time_diff;
else
running_time_diff = -1;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
if (running_time_diff == -1) {
GST_WARNING_OBJECT (pad, "QOS event before group start");
goto out;
} else if (timestamp < running_time_diff) {
GST_DEBUG_OBJECT (pad, "QOS event from previous group");
goto out;
}
GST_LOG_OBJECT (pad,
"Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
GST_TIME_ARGS (running_time_diff),
GST_TIME_ARGS (timestamp - running_time_diff));
timestamp -= running_time_diff;
/* That case is invalid for QoS events */
if (diff < 0 && -diff > timestamp) {
GST_DEBUG_OBJECT (pad, "QOS event from previous group");
ret = TRUE;
goto out;
}
event = gst_event_new_qos (proportion, diff, timestamp);
break;
}
default:
break;
}
skip_adjustments:
opad = gst_stream_get_other_pad_from_pad (pad);
if (opad) {
ret = gst_pad_push_event (opad, event);
gst_object_unref (opad);
}
out:
gst_object_unref (self);
return ret;
}
/* sinkpad functions */
static gboolean
gst_stream_synchronizer_sink_event (GstPad * pad, GstEvent * event)
{
GstStreamSynchronizer *self =
GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
GstPad *opad;
gboolean ret = FALSE;
if (passthrough)
goto skip_adjustments;
GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event->structure);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SINK_MESSAGE:{
GstMessage *message;
gst_event_parse_sink_message (event, &message);
if (message->structure
&& gst_structure_has_name (message->structure,
"playbin2-stream-changed")) {
GstStream *stream;
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (stream) {
GList *l;
gboolean all_wait = TRUE;
GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
stream->is_eos = FALSE;
stream->wait = TRUE;
stream->new_stream = TRUE;
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
all_wait = all_wait && ostream->wait;
if (!all_wait)
break;
}
if (all_wait) {
gint64 last_stop = 0;
GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
gint64 stop_running_time;
gint64 last_stop_running_time;
ostream->wait = FALSE;
stop_running_time =
gst_segment_to_running_time (&ostream->segment,
GST_FORMAT_TIME, ostream->segment.stop);
last_stop_running_time =
gst_segment_to_running_time (&ostream->segment,
GST_FORMAT_TIME, ostream->segment.last_stop);
last_stop =
MAX (last_stop, MAX (stop_running_time,
last_stop_running_time));
}
last_stop = MAX (0, last_stop);
self->group_start_time = MAX (self->group_start_time, last_stop);
GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (self->group_start_time));
g_cond_broadcast (self->stream_finish_cond);
}
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
}
gst_message_unref (message);
break;
}
case GST_EVENT_NEWSEGMENT:{
GstStream *stream;
gboolean update;
gdouble rate, applied_rate;
GstFormat format;
gint64 start, stop, position;
gst_event_parse_new_segment_full (event,
&update, &rate, &applied_rate, &format, &start, &stop, &position);
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (stream) {
if (stream->wait) {
GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
g_cond_wait (self->stream_finish_cond, self->lock);
stream = gst_pad_get_element_private (pad);
if (stream)
stream->wait = FALSE;
}
}
if (self->shutdown) {
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
gst_event_unref (event);
goto done;
}
if (stream && format == GST_FORMAT_TIME) {
if (stream->new_stream) {
gint64 last_stop_running_time = 0;
gint64 stop_running_time = 0;
if (stream->segment.format == GST_FORMAT_TIME) {
last_stop_running_time =
gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
stream->segment.last_stop);
last_stop_running_time = MAX (last_stop_running_time, 0);
stop_running_time =
gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
stream->segment.stop);
stop_running_time = MAX (last_stop_running_time, 0);
if (stop_running_time != last_stop_running_time) {
GST_WARNING_OBJECT (pad,
"Gap between last_stop and segment stop: %" GST_TIME_FORMAT
" != %" GST_TIME_FORMAT, GST_TIME_ARGS (stop_running_time),
GST_TIME_ARGS (last_stop_running_time));
}
if (stop_running_time < last_stop_running_time) {
GST_DEBUG_OBJECT (pad, "Updating stop position");
gst_pad_push_event (stream->srcpad,
gst_event_new_new_segment_full (TRUE, stream->segment.rate,
stream->segment.applied_rate, GST_FORMAT_TIME,
stream->segment.start, stream->segment.last_stop,
stream->segment.time));
gst_segment_set_newsegment_full (&stream->segment, TRUE,
stream->segment.rate, stream->segment.applied_rate,
GST_FORMAT_TIME, stream->segment.start,
stream->segment.last_stop, stream->segment.time);
}
stop_running_time = MAX (stop_running_time, last_stop_running_time);
GST_DEBUG_OBJECT (pad,
"Stop running time of last group: %" GST_TIME_FORMAT,
GST_TIME_ARGS (stop_running_time));
}
stream->new_stream = FALSE;
stream->drop_discont = TRUE;
if (stop_running_time < self->group_start_time) {
gint64 diff = self->group_start_time - stop_running_time;
GST_DEBUG_OBJECT (pad,
"Advancing running time for other streams by: %"
GST_TIME_FORMAT, GST_TIME_ARGS (diff));
gst_pad_push_event (stream->srcpad,
gst_event_new_new_segment_full (FALSE, 1.0, 1.0,
GST_FORMAT_TIME, 0, diff, 0));
gst_segment_set_newsegment_full (&stream->segment, FALSE, 1.0, 1.0,
GST_FORMAT_TIME, 0, diff, 0);
}
}
GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
&stream->segment);
gst_segment_set_newsegment_full (&stream->segment, update, rate,
applied_rate, format, start, stop, position);
GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
&stream->segment);
GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->segment.accum));
stream->running_time_diff = stream->segment.accum;
} else if (stream) {
GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
gst_format_get_name (format));
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
case GST_EVENT_FLUSH_STOP:{
GstStream *stream;
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (stream) {
GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
stream->stream_number);
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
stream->is_eos = FALSE;
stream->wait = FALSE;
stream->new_stream = FALSE;
stream->drop_discont = FALSE;
stream->seen_data = FALSE;
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
case GST_EVENT_EOS:{
GstStream *stream;
GList *l;
gboolean all_eos = TRUE;
gboolean seen_data;
GSList *pads = NULL;
GstPad *srcpad;
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (stream) {
GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
stream->is_eos = TRUE;
}
seen_data = stream->seen_data;
srcpad = gst_object_ref (stream->srcpad);
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
all_eos = all_eos && ostream->is_eos;
if (!all_eos)
break;
}
if (all_eos) {
GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
/* local snapshot of current pads */
gst_object_ref (ostream->srcpad);
pads = g_slist_prepend (pads, ostream->srcpad);
}
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
/* drop lock when sending eos, which may block in e.g. preroll */
if (pads) {
GstPad *pad;
GSList *epad;
ret = TRUE;
epad = pads;
while (epad) {
pad = epad->data;
GST_DEBUG_OBJECT (pad, "Pushing EOS");
ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
gst_object_unref (pad);
epad = g_slist_next (epad);
}
g_slist_free (pads);
} else {
/* if EOS, but no data has passed, then send something to replace EOS
* for preroll purposes */
if (!seen_data) {
GstBuffer *buf = gst_buffer_new ();
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_PREROLL);
gst_pad_push (srcpad, buf);
}
}
gst_object_unref (srcpad);
goto done;
break;
}
default:
break;
}
skip_adjustments:
opad = gst_stream_get_other_pad_from_pad (pad);
if (opad) {
ret = gst_pad_push_event (opad, event);
gst_object_unref (opad);
}
done:
gst_object_unref (self);
return ret;
}
static GstFlowReturn
gst_stream_synchronizer_sink_bufferalloc (GstPad * pad, guint64 offset,
guint size, GstCaps * caps, GstBuffer ** buf)
{
GstPad *opad;
GstFlowReturn ret = GST_FLOW_ERROR;
GST_LOG_OBJECT (pad, "Allocating buffer: size=%u", size);
opad = gst_stream_get_other_pad_from_pad (pad);
if (opad) {
ret = gst_pad_alloc_buffer (opad, offset, size, caps, buf);
gst_object_unref (opad);
}
GST_LOG_OBJECT (pad, "Allocation: %s", gst_flow_get_name (ret));
return ret;
}
static GstFlowReturn
gst_stream_synchronizer_sink_chain (GstPad * pad, GstBuffer * buffer)
{
GstStreamSynchronizer *self =
GST_STREAM_SYNCHRONIZER (gst_pad_get_parent (pad));
GstPad *opad;
GstFlowReturn ret = GST_FLOW_ERROR;
GstStream *stream;
GstClockTime timestamp = GST_CLOCK_TIME_NONE;
GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
if (passthrough) {
opad = gst_stream_get_other_pad_from_pad (pad);
if (opad) {
ret = gst_pad_push (opad, buffer);
gst_object_unref (opad);
}
goto done;
}
GST_LOG_OBJECT (pad, "Handling buffer %p: size=%u, timestamp=%"
GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
" offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
buffer, GST_BUFFER_SIZE (buffer),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
timestamp = GST_BUFFER_TIMESTAMP (buffer);
if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
&& GST_BUFFER_DURATION_IS_VALID (buffer))
timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
stream->seen_data = TRUE;
if (stream && stream->drop_discont) {
buffer = gst_buffer_make_metadata_writable (buffer);
GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
stream->drop_discont = FALSE;
}
if (stream && stream->segment.format == GST_FORMAT_TIME
&& GST_CLOCK_TIME_IS_VALID (timestamp)) {
GST_LOG_OBJECT (pad,
"Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->segment.last_stop), GST_TIME_ARGS (timestamp));
gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME, timestamp);
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
opad = gst_stream_get_other_pad_from_pad (pad);
if (opad) {
ret = gst_pad_push (opad, buffer);
gst_object_unref (opad);
}
GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
if (ret == GST_FLOW_OK) {
GList *l;
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (stream && stream->segment.format == GST_FORMAT_TIME
&& GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
GST_LOG_OBJECT (pad,
"Updating last-stop from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->segment.last_stop),
GST_TIME_ARGS (timestamp_end));
gst_segment_set_last_stop (&stream->segment, GST_FORMAT_TIME,
timestamp_end);
}
/* Advance EOS streams if necessary. For non-EOS
* streams the demuxers should already do this! */
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
gint64 last_stop;
if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
continue;
if (ostream->segment.last_stop != -1)
last_stop = ostream->segment.last_stop;
else
last_stop = ostream->segment.start;
/* Is there a 1 second lag? */
if (last_stop != -1 && last_stop + GST_SECOND < timestamp_end) {
gint64 new_start, new_stop;
new_start = timestamp_end - GST_SECOND;
if (ostream->segment.stop == -1)
new_stop = -1;
else
new_stop = MAX (new_start, ostream->segment.stop);
GST_DEBUG_OBJECT (ostream->sinkpad,
"Advancing stream %u from %" GST_TIME_FORMAT " to %"
GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (last_stop),
GST_TIME_ARGS (new_start));
gst_pad_push_event (ostream->srcpad,
gst_event_new_new_segment_full (TRUE, ostream->segment.rate,
ostream->segment.applied_rate, ostream->segment.format,
new_start, new_stop, new_start));
gst_segment_set_newsegment_full (&ostream->segment, TRUE,
ostream->segment.rate, ostream->segment.applied_rate,
ostream->segment.format, new_start, new_stop, new_start);
gst_segment_set_last_stop (&ostream->segment, GST_FORMAT_TIME,
new_start);
}
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
}
done:
gst_object_unref (self);
return ret;
}
/* GstElement vfuncs */
static GstPad *
gst_stream_synchronizer_request_new_pad (GstElement * element,
GstPadTemplate * temp, const gchar * name)
{
GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
GstStream *stream;
gchar *tmp;
GST_STREAM_SYNCHRONIZER_LOCK (self);
GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
self->current_stream_number);
stream = g_slice_new0 (GstStream);
stream->transform = self;
stream->stream_number = self->current_stream_number;
tmp = g_strdup_printf ("sink_%d", self->current_stream_number);
stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
g_free (tmp);
gst_pad_set_element_private (stream->sinkpad, stream);
gst_pad_set_iterate_internal_links_function (stream->sinkpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
gst_pad_set_query_function (stream->sinkpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
gst_pad_set_getcaps_function (stream->sinkpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
gst_pad_set_acceptcaps_function (stream->sinkpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
gst_pad_set_event_function (stream->sinkpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
gst_pad_set_chain_function (stream->sinkpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
gst_pad_set_bufferalloc_function (stream->sinkpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_bufferalloc));
tmp = g_strdup_printf ("src_%d", self->current_stream_number);
stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
g_free (tmp);
gst_pad_set_element_private (stream->srcpad, stream);
gst_pad_set_iterate_internal_links_function (stream->srcpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
gst_pad_set_query_function (stream->srcpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
gst_pad_set_getcaps_function (stream->srcpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_getcaps));
gst_pad_set_acceptcaps_function (stream->srcpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_acceptcaps));
gst_pad_set_event_function (stream->srcpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
self->streams = g_list_prepend (self->streams, stream);
self->current_stream_number++;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
/* Add pads and activate unless we're going to NULL */
g_static_rec_mutex_lock (GST_STATE_GET_LOCK (self));
if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
gst_pad_set_active (stream->srcpad, TRUE);
gst_pad_set_active (stream->sinkpad, TRUE);
}
gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
return stream->sinkpad;
}
/* Must be called with lock! */
static void
gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
GstStream * stream)
{
GList *l;
GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
for (l = self->streams; l; l = l->next) {
if (l->data == stream) {
self->streams = g_list_delete_link (self->streams, l);
break;
}
}
g_assert (l != NULL);
/* we can drop the lock, since stream exists now only local.
* Moreover, we should drop, to prevent deadlock with STREAM_LOCK
* (due to reverse lock order) when deactivating pads */
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
gst_pad_set_element_private (stream->srcpad, NULL);
gst_pad_set_element_private (stream->sinkpad, NULL);
gst_pad_set_active (stream->srcpad, FALSE);
gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
gst_pad_set_active (stream->sinkpad, FALSE);
gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
if (stream->segment.format == GST_FORMAT_TIME) {
gint64 stop_running_time;
gint64 last_stop_running_time;
stop_running_time =
gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
stream->segment.stop);
last_stop_running_time =
gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
stream->segment.last_stop);
stop_running_time = MAX (stop_running_time, last_stop_running_time);
GST_DEBUG_OBJECT (stream->sinkpad,
"Stop running time was: %" GST_TIME_FORMAT,
GST_TIME_ARGS (stop_running_time));
self->group_start_time = MAX (self->group_start_time, stop_running_time);
}
g_slice_free (GstStream, stream);
/* NOTE: In theory we have to check here if all streams
* are EOS but the one that was removed wasn't and then
* send EOS downstream. But due to the way how playsink
* works this is not necessary and will only cause problems
* for gapless playback. playsink will only add/remove pads
* when it's reconfigured, which happens when the streams
* change
*/
/* lock for good measure, since the caller had it */
GST_STREAM_SYNCHRONIZER_LOCK (self);
}
static void
gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
{
GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
GstStream *stream;
GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad);
if (stream) {
g_assert (stream->sinkpad == pad);
gst_stream_synchronizer_release_stream (self, stream);
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
}
static GstStateChangeReturn
gst_stream_synchronizer_change_state (GstElement * element,
GstStateChange transition)
{
GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
GST_DEBUG_OBJECT (self, "State change NULL->READY");
self->shutdown = FALSE;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
self->group_start_time = 0;
self->shutdown = FALSE;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
GST_DEBUG_OBJECT (self, "State change PAUSED->PLAYING");
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
GST_DEBUG_OBJECT (self, "State change READY->NULL");
GST_STREAM_SYNCHRONIZER_LOCK (self);
g_cond_broadcast (self->stream_finish_cond);
self->shutdown = TRUE;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
default:
break;
}
{
GstStateChangeReturn bret;
bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", bret);
if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
return ret;
}
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:{
GList *l;
GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
self->group_start_time = 0;
GST_STREAM_SYNCHRONIZER_LOCK (self);
for (l = self->streams; l; l = l->next) {
GstStream *stream = l->data;
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
stream->wait = FALSE;
stream->new_stream = FALSE;
stream->drop_discont = FALSE;
stream->is_eos = FALSE;
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
case GST_STATE_CHANGE_READY_TO_NULL:{
GST_DEBUG_OBJECT (self, "State change READY->NULL");
GST_STREAM_SYNCHRONIZER_LOCK (self);
while (self->streams)
gst_stream_synchronizer_release_stream (self, self->streams->data);
self->current_stream_number = 0;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
default:
break;
}
return ret;
}
/* GObject vfuncs */
static void
gst_stream_synchronizer_finalize (GObject * object)
{
GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
if (self->lock) {
g_mutex_free (self->lock);
self->lock = NULL;
}
if (self->stream_finish_cond) {
g_cond_free (self->stream_finish_cond);
self->stream_finish_cond = NULL;
}
G_OBJECT_CLASS (parent_class)->finalize (object);
}
/* GObject type initialization */
static void
gst_stream_synchronizer_init (GstStreamSynchronizer * self,
GstStreamSynchronizerClass * klass)
{
self->lock = g_mutex_new ();
self->stream_finish_cond = g_cond_new ();
}
static void
gst_stream_synchronizer_base_init (gpointer g_class)
{
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&srctemplate));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
gst_element_class_set_details_simple (gstelement_class,
"Stream Synchronizer", "Generic",
"Synchronizes a group of streams to have equal durations and starting points",
"Sebastian Dröge <sebastian.droege@collabora.co.uk>");
}
static void
gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
GstElementClass *element_class = (GstElementClass *) klass;
GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
"streamsynchronizer", 0, "Stream Synchronizer");
gobject_class->finalize = gst_stream_synchronizer_finalize;
element_class->change_state =
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
element_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
element_class->release_pad =
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
}