gst/adder/gstadder.*: Updated some docs. Added comments and FIXMEs all over the place.

Original commit message from CVS:
* gst/adder/gstadder.c: (gst_adder_setcaps),
(gst_adder_query_duration), (gst_adder_query), (forward_event),
(gst_adder_src_event), (gst_adder_sink_event),
(gst_adder_class_init), (gst_adder_finalize),
(gst_adder_request_new_pad), (gst_adder_collected):
* gst/adder/gstadder.h:
Updated some docs. Added comments and FIXMEs all over the place.
Improve debugging info.
Fix leak on finalize by not calling the parent.
Implement duration query.
Make event forwarding threadsafe.
Correctly send NEWSEGMENT at start and after flush.
Handle EOS correctly.
Post error when not negotiated.
* tests/check/elements/adder.c: (GST_START_TEST):
Added FIXME in the test.
This commit is contained in:
Wim Taymans 2006-05-10 11:54:36 +00:00
parent d8965c30fb
commit f96d80accc
4 changed files with 278 additions and 67 deletions

View file

@ -1,3 +1,23 @@
2006-05-10 Wim Taymans <wim@fluendo.com>
* gst/adder/gstadder.c: (gst_adder_setcaps),
(gst_adder_query_duration), (gst_adder_query), (forward_event),
(gst_adder_src_event), (gst_adder_sink_event),
(gst_adder_class_init), (gst_adder_finalize),
(gst_adder_request_new_pad), (gst_adder_collected):
* gst/adder/gstadder.h:
Updated some docs. Added comments and FIXMEs all over the place.
Improve debugging info.
Fix leak on finalize by not calling the parent.
Implement duration query.
Make event forwarding threadsafe.
Correctly send NEWSEGMENT at start and after flush.
Handle EOS correctly.
Post error when not negotiated.
* tests/check/elements/adder.c: (GST_START_TEST):
Added FIXME in the test.
2006-05-09 Tim-Philipp Müller <tim at centricular dot net> 2006-05-09 Tim-Philipp Müller <tim at centricular dot net>
* ext/pango/gsttextoverlay.c: (gst_text_overlay_valign_get_type), * ext/pango/gsttextoverlay.c: (gst_text_overlay_valign_get_type),

View file

@ -1,7 +1,7 @@
/* GStreamer /* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2001 Thomas <thomas@apestaart.org> * 2001 Thomas <thomas@apestaart.org>
* 2005 Wim Taymans <wim@fluendo.com> * 2005,2006 Wim Taymans <wim@fluendo.com>
* *
* adder.c: Adder element, N in, one out, samples are added * adder.c: Adder element, N in, one out, samples are added
* *
@ -33,7 +33,13 @@
* </programlisting> * </programlisting>
* This pipeline produces two sine waves mixed together. * This pipeline produces two sine waves mixed together.
* </para> * </para>
* <para>
* The Adder currently mixes all data received on the sinkpads as soon as possible
* without trying to synchronize the streams.
* </para>
* </refsect2> * </refsect2>
*
* Last reviewed on 2006-05-09 (0.10.7)
*/ */
/* Element-Checklist-Version: 5 */ /* Element-Checklist-Version: 5 */
@ -86,11 +92,12 @@ static GstStaticPadTemplate gst_adder_sink_template =
static void gst_adder_class_init (GstAdderClass * klass); static void gst_adder_class_init (GstAdderClass * klass);
static void gst_adder_init (GstAdder * adder); static void gst_adder_init (GstAdder * adder);
static void gst_adder_dispose (GObject * object); static void gst_adder_finalize (GObject * object);
static gboolean gst_adder_setcaps (GstPad * pad, GstCaps * caps); static gboolean gst_adder_setcaps (GstPad * pad, GstCaps * caps);
static gboolean gst_adder_query (GstPad * pad, GstQuery * query); static gboolean gst_adder_query (GstPad * pad, GstQuery * query);
static gboolean gst_adder_src_event (GstPad * pad, GstEvent * event); static gboolean gst_adder_src_event (GstPad * pad, GstEvent * event);
static gboolean gst_adder_sink_event (GstPad * pad, GstEvent * event);
static GstPad *gst_adder_request_new_pad (GstElement * element, static GstPad *gst_adder_request_new_pad (GstElement * element,
GstPadTemplate * temp, const gchar * unused); GstPadTemplate * temp, const gchar * unused);
@ -151,7 +158,8 @@ gst_adder_setcaps (GstPad * pad, GstCaps * caps)
adder = GST_ADDER (GST_PAD_PARENT (pad)); adder = GST_ADDER (GST_PAD_PARENT (pad));
/* see if the other pads can accept the format */ /* FIXME, see if the other pads can accept the format. Also lock the
* format on the other pads to this new format. */
GST_OBJECT_LOCK (adder); GST_OBJECT_LOCK (adder);
pads = GST_ELEMENT (adder)->pads; pads = GST_ELEMENT (adder)->pads;
while (pads) { while (pads) {
@ -215,15 +223,73 @@ gst_adder_setcaps (GstPad * pad, GstCaps * caps)
gst_structure_get_int (structure, "channels", &adder->channels); gst_structure_get_int (structure, "channels", &adder->channels);
gst_structure_get_int (structure, "rate", &adder->rate); gst_structure_get_int (structure, "rate", &adder->rate);
/* precalc bps */
adder->bps = (adder->width / 8) * adder->channels;
return TRUE; return TRUE;
not_supported: not_supported:
{ {
GST_DEBUG_OBJECT (adder, "unsupported format set as caps");
return FALSE; return FALSE;
} }
} }
/* FIXME, the duration query should reflect how long you will produce
* data, that is the amount of stream time until you will emit EOS.
* For synchronized mixing this
* is always the max of all the durations of upstream since we emit
* EOS when all of them finished.
* We don't do synchronized mixing so this really depends on where the
* streams where punched in and what their relative offsets are against
* eachother which we can get from the first timestamps we see.
* When we add a new stream (or remove a stream) the duration might
* also become invalid again and we need to post a new DURATION
* message to ntify this fact to the parent.
* For now we take the max of all the upstream elements so the simple
* cases work at least somewhat. */
static gboolean
gst_adder_query_duration (GstAdder * adder, GstQuery * query)
{
GList *pads;
gint64 max;
gboolean res;
GstFormat format;
max = -1;
res = TRUE;
/* parse format */
gst_query_parse_duration (query, &format, NULL);
GST_OBJECT_LOCK (adder);
pads = GST_ELEMENT_CAST (adder)->sinkpads;
for (; pads; pads = g_list_next (pads)) {
GstPad *pad = GST_PAD_CAST (pads->data);
gint64 duration;
/* ask sink peer for duration */
res &= gst_pad_query_peer_duration (pad, &format, &duration);
/* take max from all valid return values */
if (res) {
/* valid unknown length, stop searching */
if (duration == -1) {
max = duration;
break;
}
/* else see if bigger than current max */
else if (duration > max)
max = duration;
}
}
GST_OBJECT_UNLOCK (adder);
/* and store the max */
gst_query_set_duration (query, format, max);
return res;
}
static gboolean static gboolean
gst_adder_query (GstPad * pad, GstQuery * query) gst_adder_query (GstPad * pad, GstQuery * query)
{ {
@ -239,11 +305,12 @@ gst_adder_query (GstPad * pad, GstQuery * query)
switch (format) { switch (format) {
case GST_FORMAT_TIME: case GST_FORMAT_TIME:
gst_query_set_position (query, GST_FORMAT_TIME, adder->timestamp); /* FIXME, bring to stream time, might be tricky */
gst_query_set_position (query, format, adder->timestamp);
res = TRUE; res = TRUE;
break; break;
case GST_FORMAT_DEFAULT: case GST_FORMAT_DEFAULT:
gst_query_set_position (query, GST_FORMAT_DEFAULT, adder->offset); gst_query_set_position (query, format, adder->offset);
res = TRUE; res = TRUE;
break; break;
default: default:
@ -251,12 +318,12 @@ gst_adder_query (GstPad * pad, GstQuery * query)
} }
break; break;
} }
/* FIXME: what to do about the length? query all pads upstream and
* pick the longest length? or the shortest length? or what? */
case GST_QUERY_DURATION: case GST_QUERY_DURATION:
res = FALSE; res = gst_adder_query_duration (adder, query);
break; break;
default: default:
/* FIXME, needs a custom query handler because we have multiple
* sinkpads */
res = gst_pad_query_default (pad, query); res = gst_pad_query_default (pad, query);
break; break;
} }
@ -265,33 +332,110 @@ gst_adder_query (GstPad * pad, GstQuery * query)
return res; return res;
} }
/* forwards the event to all sinkpads, takes ownership of the
* event
*
* Returns: TRUE if the event could be forwarded on all
* sinkpads.
*/
static gboolean
forward_event (GstAdder * adder, GstEvent * event)
{
gboolean ret;
GList *pads;
GST_LOG_OBJECT (adder, "Forwarding event %p (%s)", event,
GST_EVENT_TYPE_NAME (event));
ret = TRUE;
GST_OBJECT_LOCK (adder);
pads = GST_ELEMENT_CAST (adder)->sinkpads;
for (; pads; pads = g_list_next (pads)) {
GstPad *pad = GST_PAD_CAST (pads->data);
gst_event_ref (event);
ret &= gst_pad_push_event (pad, event);
if (!ret) {
GST_WARNING_OBJECT (pad, "Sending event %p (%s) failed.",
event, GST_EVENT_TYPE_NAME (event));
break;
} else {
GST_LOG_OBJECT (pad, "Sent event %p (%s).",
event, GST_EVENT_TYPE_NAME (event));
}
}
GST_OBJECT_UNLOCK (adder);
gst_event_unref (event);
return ret;
}
static gboolean static gboolean
gst_adder_src_event (GstPad * pad, GstEvent * event) gst_adder_src_event (GstPad * pad, GstEvent * event)
{ {
GstAdder *adder = GST_ADDER (gst_pad_get_parent (pad)); GstAdder *adder;
GSList *node; gboolean result;
GstCollectData *data;
gboolean result = TRUE;
GST_LOG_OBJECT (pad, "Sending event %p (%s)", event, adder = GST_ADDER (gst_pad_get_parent (pad));
GST_EVENT_TYPE_NAME (event));
for (node = adder->collect->data; (node && result); switch (GST_EVENT_TYPE (event)) {
node = g_slist_next (node)) { case GST_EVENT_QOS:
data = (GstCollectData *) node->data; /* QoS might be tricky */
result = FALSE;
GST_LOG_OBJECT (pad, " to %s:%s", GST_DEBUG_PAD_NAME (data->pad)); break;
gst_event_ref (event); case GST_EVENT_SEEK:
result &= gst_pad_push_event (data->pad, event); /* FIXME seek needs something smarter. */
if (!result) { result = forward_event (adder, event);
GST_WARNING ("Sending event %p (%s) to %s:%s failed.", break;
event, GST_EVENT_TYPE_NAME (event), GST_DEBUG_PAD_NAME (data->pad)); case GST_EVENT_NAVIGATION:
} /* navigation is rather pointless. */
result = FALSE;
break;
default:
/* just forward the rest for now */
result = forward_event (adder, event);
break;
} }
gst_object_unref (adder); gst_object_unref (adder);
return result; return result;
} }
static gboolean
gst_adder_sink_event (GstPad * pad, GstEvent * event)
{
GstAdder *adder;
gboolean ret;
adder = GST_ADDER (gst_pad_get_parent (pad));
GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
GST_DEBUG_PAD_NAME (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP:
/* mark a pending new segment. This event is synchronized
* with the streaming thread so we can safely update the
* variable without races. It's somewhat weird because we
* assume the collectpads forwarded the FLUSH_STOP past us
* and downstream (using our source pad, the bastard!).
*/
adder->segment_pending = TRUE;
break;
default:
break;
}
/* now GstCollectPads can take care of the rest, e.g. EOS */
ret = adder->collect_event (pad, event);
gst_object_unref (adder);
return ret;
}
static void static void
gst_adder_class_init (GstAdderClass * klass) gst_adder_class_init (GstAdderClass * klass)
{ {
@ -300,7 +444,7 @@ gst_adder_class_init (GstAdderClass * klass)
gobject_class = (GObjectClass *) klass; gobject_class = (GObjectClass *) klass;
gobject_class->dispose = gst_adder_dispose; gobject_class->finalize = gst_adder_finalize;
gstelement_class = (GstElementClass *) klass; gstelement_class = (GstElementClass *) klass;
@ -345,12 +489,14 @@ gst_adder_init (GstAdder * adder)
} }
static void static void
gst_adder_dispose (GObject * object) gst_adder_finalize (GObject * object)
{ {
GstAdder *adder = GST_ADDER (object); GstAdder *adder = GST_ADDER (object);
gst_object_unref (adder->collect); gst_object_unref (adder->collect);
adder->collect = NULL; adder->collect = NULL;
G_OBJECT_CLASS (parent_class)->finalize (object);
} }
static GstPad * static GstPad *
@ -376,6 +522,12 @@ gst_adder_request_new_pad (GstElement * element, GstPadTemplate * templ,
gst_pad_set_setcaps_function (newpad, GST_DEBUG_FUNCPTR (gst_adder_setcaps)); gst_pad_set_setcaps_function (newpad, GST_DEBUG_FUNCPTR (gst_adder_setcaps));
gst_collect_pads_add_pad (adder->collect, newpad, sizeof (GstCollectData)); gst_collect_pads_add_pad (adder->collect, newpad, sizeof (GstCollectData));
/* FIXME: hacked way to override/extend the event function of
* GstCollectPads; because it sets its own event function giving the
* element no access to events */
adder->collect_event = (GstPadEventFunction) GST_PAD_EVENTFUNC (newpad);
gst_pad_set_event_function (newpad, GST_DEBUG_FUNCPTR (gst_adder_sink_event));
if (!gst_element_add_pad (GST_ELEMENT (adder), newpad)) if (!gst_element_add_pad (GST_ELEMENT (adder), newpad))
goto could_not_add; goto could_not_add;
@ -419,19 +571,20 @@ gst_adder_collected (GstCollectPads * pads, gpointer user_data)
adder = GST_ADDER (user_data); adder = GST_ADDER (user_data);
/* get available bytes for reading */ /* this is fatal */
size = gst_collect_pads_available (pads); if (G_UNLIKELY (adder->func == NULL))
if (size == 0) goto not_negotiated;
return GST_FLOW_OK;
outbuf = NULL; outbuf = NULL;
outbytes = NULL; outbytes = NULL;
if (adder->func == NULL) /* get available bytes for reading, this can be 0 which could mean
goto not_negotiated; * empty buffers or EOS, which we will catch when we loop over the
* pads. */
size = gst_collect_pads_available (pads);
GST_LOG_OBJECT (adder, GST_LOG_OBJECT (adder,
"starting to cycle through channels, collecting %d bytes", size); "starting to cycle through channels, %d bytes available", size);
for (collected = pads->data; collected; collected = g_slist_next (collected)) { for (collected = pads->data; collected; collected = g_slist_next (collected)) {
GstCollectData *data; GstCollectData *data;
@ -440,77 +593,107 @@ gst_adder_collected (GstCollectPads * pads, gpointer user_data)
data = (GstCollectData *) collected->data; data = (GstCollectData *) collected->data;
GST_LOG_OBJECT (adder, "looking into channel %p", data);
/* get pointer to copy size bytes */ /* get pointer to copy size bytes */
len = gst_collect_pads_read (pads, data, &bytes, size); len = gst_collect_pads_read (pads, data, &bytes, size);
if (len == 0) /* length 0 means EOS or an empty buffer so we still need to flush in
continue; * case of an empty buffer. */
if (len == 0) {
GST_LOG_OBJECT (adder, " copying %d bytes (format %d,%d)", GST_LOG_OBJECT (adder, "channel %p: no bytes available", data);
len, adder->format, adder->width); goto next;
GST_LOG_OBJECT (adder, " from channel %p from input data %p", data, bytes); }
if (outbuf == NULL) { if (outbuf == NULL) {
/* first buffer, alloc size bytes */ GST_LOG_OBJECT (adder, "channel %p: making output buffer of %d bytes",
outbuf = gst_buffer_new_and_alloc (size); data, size);
gst_buffer_set_caps (outbuf, GST_PAD_CAPS (adder->srcpad));
outbytes = GST_BUFFER_DATA (outbuf);
memset (outbytes, 0, size); /* first buffer, alloc size bytes. FIXME, we can easily subbuffer
* and _make_writable. */
outbuf = gst_buffer_new_and_alloc (size);
outbytes = GST_BUFFER_DATA (outbuf);
gst_buffer_set_caps (outbuf, GST_PAD_CAPS (adder->srcpad));
/* clear if we are only going to fill a partial buffer */
if (G_UNLIKELY (size > len))
memset (outbytes, 0, size);
GST_LOG_OBJECT (adder, "channel %p: copying %d bytes from data %p",
data, len, bytes);
/* and copy the data into it */ /* and copy the data into it */
memcpy (outbytes, bytes, len); memcpy (outbytes, bytes, len);
} else { } else {
GST_LOG_OBJECT (adder, "channel %p: mixing %d bytes from data %p",
data, len, bytes);
/* other buffers, need to add them */ /* other buffers, need to add them */
adder->func ((gpointer) outbytes, (gpointer) bytes, len); adder->func ((gpointer) outbytes, (gpointer) bytes, len);
} }
next:
gst_collect_pads_flush (pads, data, len); gst_collect_pads_flush (pads, data, len);
} }
/* we always timestamp in stream time */ /* can only happen when no pads to collect or all EOS */
if (outbuf == NULL)
goto eos;
/* our timestamping is very simple, just an ever incrementing
* counter, the new segment time will take care of their respective
* stream time. */
if (adder->segment_pending) { if (adder->segment_pending) {
GstEvent *event; GstEvent *event;
/* FIXME, use rate/applied_rate as set on all sinkpads.
* We could potentially figure out the duration as well using
* the current segment positions and the stated stop positions.
* Also we just start from stream time 0 which is rather
* weird. For non-synchronized mixing, the time should be
* the min of the stream times of all received segments,
* rationale being that the duration is at least going to
* be as long as the earliest stream we start mixing. This
* would also be correct for synchronized mixing but then
* the later streams would be delayed until the stream times
* match.
*/
event = gst_event_new_new_segment_full (FALSE, 1.0, event = gst_event_new_new_segment_full (FALSE, 1.0,
1.0, GST_FORMAT_TIME, adder->timestamp, -1, adder->timestamp); 1.0, GST_FORMAT_TIME, adder->timestamp, -1, 0);
gst_pad_push_event (adder->srcpad, event); gst_pad_push_event (adder->srcpad, event);
adder->segment_pending = FALSE; adder->segment_pending = FALSE;
} }
/* set timestamps on the output buffer */ /* set timestamps on the output buffer */
{ GST_BUFFER_TIMESTAMP (outbuf) = adder->timestamp;
guint64 samples; GST_BUFFER_OFFSET (outbuf) = adder->offset;
GST_BUFFER_TIMESTAMP (outbuf) = adder->timestamp; /* for the next timestamp, use the sample counter, which will
GST_BUFFER_OFFSET (outbuf) = adder->offset; * never accumulate rounding errors */
adder->offset += size / adder->bps;
adder->timestamp = gst_util_uint64_scale_int (adder->offset,
GST_SECOND, adder->rate);
/* get next timestamp */ /* now we can set the duration of the buffer */
/* width is in bits and we need bytes */ GST_BUFFER_DURATION (outbuf) = adder->timestamp -
samples = size / ((adder->width / 8) * adder->channels); GST_BUFFER_TIMESTAMP (outbuf);
adder->offset += samples;
adder->timestamp = gst_util_uint64_scale_int (adder->offset,
GST_SECOND, adder->rate);
/* now we can set the duration */
GST_BUFFER_DURATION (outbuf) = adder->timestamp -
GST_BUFFER_TIMESTAMP (outbuf);
}
/* send it out */ /* send it out */
GST_LOG_OBJECT (adder, "pushing outbuf"); GST_LOG_OBJECT (adder, "pushing outbuf, timestamp %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)));
ret = gst_pad_push (adder->srcpad, outbuf); ret = gst_pad_push (adder->srcpad, outbuf);
GST_LOG_OBJECT (adder, "pushed outbuf");
return ret; return ret;
/* ERRORS */ /* ERRORS */
not_negotiated: not_negotiated:
{ {
GST_ELEMENT_ERROR (adder, STREAM, FORMAT, (NULL),
("Unknown data received, not negotiated"));
return GST_FLOW_NOT_NEGOTIATED; return GST_FLOW_NOT_NEGOTIATED;
} }
eos:
{
GST_DEBUG_OBJECT (adder, "no data available, must be EOS");
gst_pad_push_event (adder->srcpad, gst_event_new_eos ());
return GST_FLOW_UNEXPECTED;
}
} }
static GstStateChangeReturn static GstStateChangeReturn

View file

@ -65,6 +65,9 @@ struct _GstAdder {
gint depth; gint depth;
gboolean is_signed; gboolean is_signed;
/* number of bytes per sample, actually width/8 * channels */
gint bps;
/* function to add samples */ /* function to add samples */
GstAdderFunction func; GstAdderFunction func;
@ -73,6 +76,7 @@ struct _GstAdder {
gint64 offset; gint64 offset;
/* sink event handling */ /* sink event handling */
GstPadEventFunction collect_event;
GstSegment segment; GstSegment segment;
gboolean segment_pending; gboolean segment_pending;
}; };

View file

@ -127,6 +127,9 @@ GST_START_TEST (test_event)
bus = gst_element_get_bus (bin); bus = gst_element_get_bus (bin);
gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH); gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH);
/* FIXME, fakesrc with default setting will produce 0 sized
* buffers and incompatible caps for adder that will make
* adder EOS and error out */
src1 = gst_element_factory_make ("fakesrc", "src1"); src1 = gst_element_factory_make ("fakesrc", "src1");
//g_object_set (src1, "wave", 4, NULL); /* silence */ //g_object_set (src1, "wave", 4, NULL); /* silence */
src2 = gst_element_factory_make ("fakesrc", "src2"); src2 = gst_element_factory_make ("fakesrc", "src2");
@ -161,6 +164,7 @@ GST_START_TEST (test_event)
res = gst_element_set_state (bin, GST_STATE_PAUSED); res = gst_element_set_state (bin, GST_STATE_PAUSED);
fail_unless (res != GST_STATE_CHANGE_FAILURE, NULL); fail_unless (res != GST_STATE_CHANGE_FAILURE, NULL);
/* FIXME, PAUSED is async and seek might not work before being prerolled. */
res = gst_element_send_event (bin, seek_event); res = gst_element_send_event (bin, seek_event);
fail_unless (res == TRUE, NULL); fail_unless (res == TRUE, NULL);