diff --git a/gst/audiomixer/Makefile.am b/gst/audiomixer/Makefile.am index 2a3051ddb1..b98ae8adba 100644 --- a/gst/audiomixer/Makefile.am +++ b/gst/audiomixer/Makefile.am @@ -4,7 +4,7 @@ ORC_SOURCE=gstaudiomixerorc include $(top_srcdir)/common/orc.mak -libgstaudiomixer_la_SOURCES = gstaudiomixer.c +libgstaudiomixer_la_SOURCES = gstaudiomixer.c gstaudioaggregator.c nodist_libgstaudiomixer_la_SOURCES = $(ORC_NODIST_SOURCES) libgstaudiomixer_la_CFLAGS = \ -I$(top_srcdir)/gst-libs \ @@ -18,5 +18,5 @@ libgstaudiomixer_la_LIBADD = \ $(GST_BASE_LIBS) $(GST_LIBS) $(ORC_LIBS) libgstaudiomixer_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS) -noinst_HEADERS = gstaudiomixer.h +noinst_HEADERS = gstaudiomixer.h gstaudioaggregator.h diff --git a/gst/audiomixer/gstaudioaggregator.c b/gst/audiomixer/gstaudioaggregator.c new file mode 100644 index 0000000000..5939c1dc92 --- /dev/null +++ b/gst/audiomixer/gstaudioaggregator.c @@ -0,0 +1,1270 @@ +/* GStreamer + * Copyright (C) 1999,2000 Erik Walthinsen + * 2001 Thomas + * 2005,2006 Wim Taymans + * 2013 Sebastian Dröge + * 2014 Collabora + * Olivier Crete + * + * gstaudioaggregator.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +/** + * SECTION: gstaudioaggregator + * @short_description: manages a set of pads with the purpose of + * aggregating their buffers for raw audio + * @see_also: #GstAggregator + * + */ + + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "gstaudioaggregator.h" + +#include + +GST_DEBUG_CATEGORY_STATIC (audio_aggregator_debug); +#define GST_CAT_DEFAULT audio_aggregator_debug + +struct _GstAudioAggregatorPadPrivate +{ + /* All members are protected by the pad object lock */ + + GstBuffer *buffer; /* current buffer we're mixing, + for comparison with collect.buffer + to see if we need to update our + cached values. */ + guint position, size; + + guint64 output_offset; /* Offset in output segment that + collect.pos refers to in the + current buffer. */ + + guint64 next_offset; /* Next expected offset in the input segment */ + + /* Last time we noticed a discont */ + GstClockTime discont_time; + + /* A new unhandled segment event has been received */ + gboolean new_segment; +}; + + +/***************************************** + * GstAudioAggregatorPad implementation * + *****************************************/ +G_DEFINE_TYPE (GstAudioAggregatorPad, gst_audio_aggregator_pad, + GST_TYPE_AGGREGATOR_PAD); + +static gboolean +gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad, + GstAggregator * aggregator); + +static void +gst_audio_aggregator_pad_class_init (GstAudioAggregatorPadClass * klass) +{ + GstAggregatorPadClass *aggpadclass = (GstAggregatorPadClass *) klass; + + g_type_class_add_private (klass, sizeof (GstAudioAggregatorPadPrivate)); + + aggpadclass->flush = GST_DEBUG_FUNCPTR (gst_audio_aggregator_pad_flush_pad); +} + +static void +gst_audio_aggregator_pad_init (GstAudioAggregatorPad * pad) +{ + pad->priv = + G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AUDIO_AGGREGATOR_PAD, + GstAudioAggregatorPadPrivate); + + gst_audio_info_init (&pad->info); + + pad->priv->buffer = NULL; + pad->priv->position = 0; + pad->priv->size = 0; + pad->priv->output_offset = -1; + pad->priv->next_offset = -1; + pad->priv->discont_time = GST_CLOCK_TIME_NONE; +} + + +static gboolean +gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad, + GstAggregator * aggregator) +{ + GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad); + + GST_OBJECT_LOCK (aggpad); + pad->priv->position = pad->priv->size = 0; + pad->priv->output_offset = pad->priv->next_offset = -1; + pad->priv->discont_time = GST_CLOCK_TIME_NONE; + gst_buffer_replace (&pad->priv->buffer, NULL); + GST_OBJECT_UNLOCK (aggpad); + + return TRUE; +} + + + +/************************************** + * GstAudioAggregator implementation * + **************************************/ + +struct _GstAudioAggregatorPrivate +{ + GMutex mutex; + + gboolean send_caps; /* aagg lock */ + + /* All three properties are unprotected, can't be modified while streaming */ + /* Size in frames that is output per buffer */ + GstClockTime output_buffer_duration; + GstClockTime alignment_threshold; + GstClockTime discont_wait; + + /* Protected by srcpad stream clock */ + /* Buffer starting at offset containing block_size frames */ + GstBuffer *current_buffer; + + /* counters to keep track of timestamps */ + /* Readable with object lock, writable with both aag lock and object lock */ + gint64 offset; +}; + +#define GST_AUDIO_AGGREGATOR_LOCK(self) g_mutex_lock (&(self)->priv->mutex); +#define GST_AUDIO_AGGREGATOR_UNLOCK(self) g_mutex_unlock (&(self)->priv->mutex); + +static void gst_audio_aggregator_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_audio_aggregator_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static void gst_audio_aggregator_dispose (GObject * object); + +static gboolean gst_audio_aggregator_src_event (GstAggregator * agg, + GstEvent * event); +static gboolean gst_audio_aggregator_sink_event (GstAggregator * agg, + GstAggregatorPad * aggpad, GstEvent * event); +static gboolean gst_audio_aggregator_src_query (GstAggregator * agg, + GstQuery * query); +static gboolean gst_audio_aggregator_start (GstAggregator * agg); +static gboolean gst_audio_aggregator_stop (GstAggregator * agg); +static GstFlowReturn gst_audio_aggregator_flush (GstAggregator * agg); + +static GstBuffer *gst_audio_aggregator_create_output_buffer (GstAudioAggregator + * aagg, guint num_frames); +static GstFlowReturn gst_audio_aggregator_do_clip (GstAggregator * agg, + GstAggregatorPad * bpad, GstBuffer * buffer, GstBuffer ** outbuf); +static GstFlowReturn gst_audio_aggregator_aggregate (GstAggregator * agg, + gboolean timeout); + +#define DEFAULT_OUTPUT_BUFFER_DURATION (10 * GST_MSECOND) +#define DEFAULT_ALIGNMENT_THRESHOLD (40 * GST_MSECOND) +#define DEFAULT_DISCONT_WAIT (1 * GST_SECOND) + +enum +{ + PROP_0, + PROP_OUTPUT_BUFFER_DURATION, + PROP_ALIGNMENT_THRESHOLD, + PROP_DISCONT_WAIT, +}; + +G_DEFINE_ABSTRACT_TYPE (GstAudioAggregator, gst_audio_aggregator, + GST_TYPE_AGGREGATOR); + +static GstClockTime +gst_audio_aggregator_get_next_time (GstAggregator * agg) +{ + GstClockTime next_time; + + GST_OBJECT_LOCK (agg); + if (agg->segment.position == -1) + next_time = agg->segment.start; + else + next_time = agg->segment.position; + GST_OBJECT_UNLOCK (agg); + + return next_time; +} + +static void +gst_audio_aggregator_class_init (GstAudioAggregatorClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + GstAggregatorClass *gstaggregator_class = (GstAggregatorClass *) klass; + + g_type_class_add_private (klass, sizeof (GstAudioAggregatorPrivate)); + + gobject_class->set_property = gst_audio_aggregator_set_property; + gobject_class->get_property = gst_audio_aggregator_get_property; + gobject_class->dispose = gst_audio_aggregator_dispose; + + gstaggregator_class->src_event = + GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_event); + gstaggregator_class->sink_event = + GST_DEBUG_FUNCPTR (gst_audio_aggregator_sink_event); + gstaggregator_class->src_query = + GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_query); + gstaggregator_class->start = gst_audio_aggregator_start; + gstaggregator_class->stop = gst_audio_aggregator_stop; + gstaggregator_class->flush = gst_audio_aggregator_flush; + gstaggregator_class->aggregate = + GST_DEBUG_FUNCPTR (gst_audio_aggregator_aggregate); + gstaggregator_class->clip = GST_DEBUG_FUNCPTR (gst_audio_aggregator_do_clip); + gstaggregator_class->get_next_time = gst_audio_aggregator_get_next_time; + + klass->create_output_buffer = gst_audio_aggregator_create_output_buffer; + + GST_DEBUG_CATEGORY_INIT (audio_aggregator_debug, "audioaggregator", + GST_DEBUG_FG_MAGENTA, "GstAudioAggregator"); + + g_object_class_install_property (gobject_class, PROP_OUTPUT_BUFFER_DURATION, + g_param_spec_uint64 ("output-buffer-duration", "Output Buffer Duration", + "Output block size in nanoseconds", 1, + G_MAXUINT64, DEFAULT_OUTPUT_BUFFER_DURATION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD, + g_param_spec_uint64 ("alignment-threshold", "Alignment Threshold", + "Timestamp alignment threshold in nanoseconds", 0, + G_MAXUINT64 - 1, DEFAULT_ALIGNMENT_THRESHOLD, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_DISCONT_WAIT, + g_param_spec_uint64 ("discont-wait", "Discont Wait", + "Window of time in nanoseconds to wait before " + "creating a discontinuity", 0, + G_MAXUINT64 - 1, DEFAULT_DISCONT_WAIT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +} + +static void +gst_audio_aggregator_init (GstAudioAggregator * aagg) +{ + aagg->priv = + G_TYPE_INSTANCE_GET_PRIVATE (aagg, GST_TYPE_AUDIO_AGGREGATOR, + GstAudioAggregatorPrivate); + + g_mutex_init (&aagg->priv->mutex); + + aagg->priv->output_buffer_duration = DEFAULT_OUTPUT_BUFFER_DURATION; + aagg->priv->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD; + aagg->priv->discont_wait = DEFAULT_DISCONT_WAIT; + + aagg->current_caps = NULL; + gst_audio_info_init (&aagg->info); + + gst_aggregator_set_latency (GST_AGGREGATOR (aagg), + aagg->priv->output_buffer_duration, aagg->priv->output_buffer_duration); +} + +static void +gst_audio_aggregator_dispose (GObject * object) +{ + GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object); + + gst_caps_replace (&aagg->current_caps, NULL); + + g_mutex_clear (&aagg->priv->mutex); + + G_OBJECT_CLASS (gst_audio_aggregator_parent_class)->dispose (object); +} + +static void +gst_audio_aggregator_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object); + + switch (prop_id) { + case PROP_OUTPUT_BUFFER_DURATION: + aagg->priv->output_buffer_duration = g_value_get_uint64 (value); + gst_aggregator_set_latency (GST_AGGREGATOR (aagg), + aagg->priv->output_buffer_duration, + aagg->priv->output_buffer_duration); + break; + case PROP_ALIGNMENT_THRESHOLD: + aagg->priv->alignment_threshold = g_value_get_uint64 (value); + break; + case PROP_DISCONT_WAIT: + aagg->priv->discont_wait = g_value_get_uint64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_audio_aggregator_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object); + + switch (prop_id) { + case PROP_OUTPUT_BUFFER_DURATION: + g_value_set_uint64 (value, aagg->priv->output_buffer_duration); + break; + case PROP_ALIGNMENT_THRESHOLD: + g_value_set_uint64 (value, aagg->priv->alignment_threshold); + break; + case PROP_DISCONT_WAIT: + g_value_set_uint64 (value, aagg->priv->discont_wait); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + + +/* event handling */ + +static gboolean +gst_audio_aggregator_src_event (GstAggregator * agg, GstEvent * event) +{ + gboolean result; + + GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg); + GST_DEBUG_OBJECT (agg->srcpad, "Got %s event on src pad", + GST_EVENT_TYPE_NAME (event)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_QOS: + /* QoS might be tricky */ + gst_event_unref (event); + return FALSE; + case GST_EVENT_NAVIGATION: + /* navigation is rather pointless. */ + gst_event_unref (event); + return FALSE; + break; + case GST_EVENT_SEEK: + { + GstSeekFlags flags; + gdouble rate; + GstSeekType start_type, stop_type; + gint64 start, stop; + GstFormat seek_format, dest_format; + + /* parse the seek parameters */ + gst_event_parse_seek (event, &rate, &seek_format, &flags, &start_type, + &start, &stop_type, &stop); + + /* Check the seeking parametters before linking up */ + if ((start_type != GST_SEEK_TYPE_NONE) + && (start_type != GST_SEEK_TYPE_SET)) { + result = FALSE; + GST_DEBUG_OBJECT (aagg, + "seeking failed, unhandled seek type for start: %d", start_type); + goto done; + } + if ((stop_type != GST_SEEK_TYPE_NONE) && (stop_type != GST_SEEK_TYPE_SET)) { + result = FALSE; + GST_DEBUG_OBJECT (aagg, + "seeking failed, unhandled seek type for end: %d", stop_type); + goto done; + } + + GST_OBJECT_LOCK (agg); + dest_format = agg->segment.format; + GST_OBJECT_UNLOCK (agg); + if (seek_format != dest_format) { + result = FALSE; + GST_DEBUG_OBJECT (aagg, + "seeking failed, unhandled seek format: %s", + gst_format_get_name (seek_format)); + goto done; + } + } + break; + default: + break; + } + + return + GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_event (agg, + event); + +done: + return result; +} + + +static gboolean +gst_audio_aggregator_sink_event (GstAggregator * agg, + GstAggregatorPad * aggpad, GstEvent * event) +{ + gboolean res = TRUE; + + GST_DEBUG_OBJECT (aggpad, "Got %s event on sink pad", + GST_EVENT_TYPE_NAME (event)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEGMENT: + { + const GstSegment *segment; + gst_event_parse_segment (event, &segment); + + if (segment->format != GST_FORMAT_TIME) { + GST_ERROR_OBJECT (agg, "Segment of type %s are not supported," + " only TIME segments are supported", + gst_format_get_name (segment->format)); + gst_event_unref (event); + event = NULL; + res = FALSE; + break; + } + + GST_OBJECT_LOCK (agg); + if (segment->rate != agg->segment.rate) { + GST_ERROR_OBJECT (aggpad, + "Got segment event with wrong rate %lf, expected %lf", + segment->rate, agg->segment.rate); + res = FALSE; + gst_event_unref (event); + event = NULL; + } else if (segment->rate < 0.0) { + GST_ERROR_OBJECT (aggpad, "Negative rates not supported yet"); + res = FALSE; + gst_event_unref (event); + event = NULL; + } else { + GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad); + + GST_OBJECT_LOCK (pad); + pad->priv->new_segment = TRUE; + GST_OBJECT_UNLOCK (pad); + } + GST_OBJECT_UNLOCK (agg); + + break; + } + default: + break; + } + + if (event != NULL) + return + GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->sink_event + (agg, aggpad, event); + + return res; +} + +/* FIXME, the duration query should reflect how long you will produce + * data, that is the amount of stream time until you will emit EOS. + * + * For synchronized mixing this is always the max of all the durations + * of upstream since we emit EOS when all of them finished. + * + * We don't do synchronized mixing so this really depends on where the + * streams where punched in and what their relative offsets are against + * eachother which we can get from the first timestamps we see. + * + * When we add a new stream (or remove a stream) the duration might + * also become invalid again and we need to post a new DURATION + * message to notify this fact to the parent. + * For now we take the max of all the upstream elements so the simple + * cases work at least somewhat. + */ +static gboolean +gst_audio_aggregator_query_duration (GstAudioAggregator * aagg, + GstQuery * query) +{ + gint64 max; + gboolean res; + GstFormat format; + GstIterator *it; + gboolean done; + GValue item = { 0, }; + + /* parse format */ + gst_query_parse_duration (query, &format, NULL); + + max = -1; + res = TRUE; + done = FALSE; + + it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (aagg)); + while (!done) { + GstIteratorResult ires; + + ires = gst_iterator_next (it, &item); + switch (ires) { + case GST_ITERATOR_DONE: + done = TRUE; + break; + case GST_ITERATOR_OK: + { + GstPad *pad = g_value_get_object (&item); + gint64 duration; + + /* ask sink peer for duration */ + res &= gst_pad_peer_query_duration (pad, format, &duration); + /* take max from all valid return values */ + if (res) { + /* valid unknown length, stop searching */ + if (duration == -1) { + max = duration; + done = TRUE; + } + /* else see if bigger than current max */ + else if (duration > max) + max = duration; + } + g_value_reset (&item); + break; + } + case GST_ITERATOR_RESYNC: + max = -1; + res = TRUE; + gst_iterator_resync (it); + break; + default: + res = FALSE; + done = TRUE; + break; + } + } + g_value_unset (&item); + gst_iterator_free (it); + + if (res) { + /* and store the max */ + GST_DEBUG_OBJECT (aagg, "Total duration in format %s: %" + GST_TIME_FORMAT, gst_format_get_name (format), GST_TIME_ARGS (max)); + gst_query_set_duration (query, format, max); + } + + return res; +} + + +static gboolean +gst_audio_aggregator_src_query (GstAggregator * agg, GstQuery * query) +{ + GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg); + gboolean res = FALSE; + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_DURATION: + res = gst_audio_aggregator_query_duration (aagg, query); + break; + case GST_QUERY_POSITION: + { + GstFormat format; + + gst_query_parse_position (query, &format, NULL); + + GST_OBJECT_LOCK (aagg); + + switch (format) { + case GST_FORMAT_TIME: + /* FIXME, bring to stream time, might be tricky */ + gst_query_set_position (query, format, agg->segment.position); + res = TRUE; + break; + case GST_FORMAT_BYTES: + if (GST_AUDIO_INFO_BPF (&aagg->info)) { + gst_query_set_position (query, format, aagg->priv->offset * + GST_AUDIO_INFO_BPF (&aagg->info)); + res = TRUE; + } + break; + case GST_FORMAT_DEFAULT: + gst_query_set_position (query, format, aagg->priv->offset); + res = TRUE; + break; + default: + break; + } + + GST_OBJECT_UNLOCK (aagg); + + break; + } + default: + res = + GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_query + (agg, query); + break; + } + + return res; +} + + +void +gst_audio_aggregator_set_sink_caps (GstAudioAggregator * aagg, + GstAudioAggregatorPad * pad, GstCaps * caps) +{ + GST_OBJECT_LOCK (pad); + gst_audio_info_from_caps (&pad->info, caps); + GST_OBJECT_UNLOCK (pad); +} + + +gboolean +gst_audio_aggregator_set_src_caps (GstAudioAggregator * aagg, GstCaps * caps) +{ + GstAudioInfo info; + + if (!gst_audio_info_from_caps (&info, caps)) { + GST_WARNING_OBJECT (aagg, "Rejecting invalid caps: %" GST_PTR_FORMAT, caps); + return FALSE; + } + + GST_AUDIO_AGGREGATOR_LOCK (aagg); + GST_OBJECT_LOCK (aagg); + + GST_INFO_OBJECT (aagg, "setting caps to %" GST_PTR_FORMAT, caps); + gst_caps_replace (&aagg->current_caps, caps); + + memcpy (&aagg->info, &info, sizeof (info)); + aagg->priv->send_caps = TRUE; + + GST_OBJECT_UNLOCK (aagg); + GST_AUDIO_AGGREGATOR_UNLOCK (aagg); + + /* send caps event later, after stream-start event */ + + return TRUE; +} + + +/* Must hold object lock and aagg lock to call */ + +static void +gst_audio_aggregator_reset (GstAudioAggregator * aagg) +{ + GstAggregator *agg = GST_AGGREGATOR (aagg); + + GST_AUDIO_AGGREGATOR_LOCK (aagg); + GST_OBJECT_LOCK (aagg); + agg->segment.position = -1; + aagg->priv->offset = 0; + gst_audio_info_init (&aagg->info); + gst_caps_replace (&aagg->current_caps, NULL); + gst_buffer_replace (&aagg->priv->current_buffer, NULL); + GST_OBJECT_UNLOCK (aagg); + GST_AUDIO_AGGREGATOR_UNLOCK (aagg); +} + +static gboolean +gst_audio_aggregator_start (GstAggregator * agg) +{ + GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg); + + gst_audio_aggregator_reset (aagg); + + return TRUE; +} + +static gboolean +gst_audio_aggregator_stop (GstAggregator * agg) +{ + GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg); + + gst_audio_aggregator_reset (aagg); + + return TRUE; +} + +static GstFlowReturn +gst_audio_aggregator_flush (GstAggregator * agg) +{ + GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg); + + GST_AUDIO_AGGREGATOR_LOCK (aagg); + GST_OBJECT_LOCK (aagg); + agg->segment.position = -1; + aagg->priv->offset = 0; + gst_buffer_replace (&aagg->priv->current_buffer, NULL); + GST_OBJECT_UNLOCK (aagg); + GST_AUDIO_AGGREGATOR_UNLOCK (aagg); + + return GST_FLOW_OK; +} + +static GstFlowReturn +gst_audio_aggregator_do_clip (GstAggregator * agg, + GstAggregatorPad * bpad, GstBuffer * buffer, GstBuffer ** out) +{ + GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (bpad); + gint rate, bpf; + + + rate = GST_AUDIO_INFO_RATE (&pad->info); + bpf = GST_AUDIO_INFO_BPF (&pad->info); + + GST_OBJECT_LOCK (bpad); + *out = gst_audio_buffer_clip (buffer, &bpad->segment, rate, bpf); + GST_OBJECT_UNLOCK (bpad); + + return GST_FLOW_OK; +} + +/* Called with the object lock for both the element and pad held, + * as well as the aagg lock + */ +static gboolean +gst_audio_aggregator_fill_buffer (GstAudioAggregator * aagg, + GstAudioAggregatorPad * pad, GstBuffer * inbuf) +{ + GstClockTime start_time, end_time; + gboolean discont = FALSE; + guint64 start_offset, end_offset; + GstClockTime timestamp, stream_time = GST_CLOCK_TIME_NONE; + gint rate, bpf; + + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + g_assert (pad->priv->buffer == NULL); + + rate = GST_AUDIO_INFO_RATE (&pad->info); + bpf = GST_AUDIO_INFO_BPF (&pad->info); + + pad->priv->position = 0; + pad->priv->size = gst_buffer_get_size (inbuf) / bpf; + + if (!GST_BUFFER_PTS_IS_VALID (inbuf)) { + if (pad->priv->output_offset == -1) + pad->priv->output_offset = aagg->priv->offset; + if (pad->priv->next_offset == -1) + pad->priv->next_offset = pad->priv->size; + else + pad->priv->next_offset += pad->priv->size; + goto done; + } + + timestamp = GST_BUFFER_PTS (inbuf); + stream_time = gst_segment_to_stream_time (&aggpad->segment, GST_FORMAT_TIME, + timestamp); + + /* sync object properties on stream time */ + /* TODO: Ideally we would want to do that on every sample */ + if (GST_CLOCK_TIME_IS_VALID (stream_time)) + gst_object_sync_values (GST_OBJECT (pad), stream_time); + + start_time = GST_BUFFER_PTS (inbuf); + end_time = + start_time + gst_util_uint64_scale_ceil (pad->priv->size, GST_SECOND, + rate); + + start_offset = gst_util_uint64_scale (start_time, rate, GST_SECOND); + end_offset = start_offset + pad->priv->size; + + if (GST_BUFFER_IS_DISCONT (inbuf) + || GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_RESYNC) + || pad->priv->new_segment || pad->priv->next_offset == -1) { + discont = TRUE; + pad->priv->new_segment = FALSE; + } else { + guint64 diff, max_sample_diff; + + /* Check discont, based on audiobasesink */ + if (start_offset <= pad->priv->next_offset) + diff = pad->priv->next_offset - start_offset; + else + diff = start_offset - pad->priv->next_offset; + + max_sample_diff = + gst_util_uint64_scale_int (aagg->priv->alignment_threshold, rate, + GST_SECOND); + + /* Discont! */ + if (G_UNLIKELY (diff >= max_sample_diff)) { + if (aagg->priv->discont_wait > 0) { + if (pad->priv->discont_time == GST_CLOCK_TIME_NONE) { + pad->priv->discont_time = start_time; + } else if (start_time - pad->priv->discont_time >= + aagg->priv->discont_wait) { + discont = TRUE; + pad->priv->discont_time = GST_CLOCK_TIME_NONE; + } + } else { + discont = TRUE; + } + } else if (G_UNLIKELY (pad->priv->discont_time != GST_CLOCK_TIME_NONE)) { + /* we have had a discont, but are now back on track! */ + pad->priv->discont_time = GST_CLOCK_TIME_NONE; + } + } + + if (discont) { + /* Have discont, need resync */ + if (pad->priv->next_offset != -1) + GST_INFO_OBJECT (pad, "Have discont. Expected %" + G_GUINT64_FORMAT ", got %" G_GUINT64_FORMAT, + pad->priv->next_offset, start_offset); + pad->priv->output_offset = -1; + pad->priv->next_offset = end_offset; + } else { + pad->priv->next_offset += pad->priv->size; + } + + if (pad->priv->output_offset == -1) { + GstClockTime start_running_time; + GstClockTime end_running_time; + guint64 start_running_time_offset; + guint64 end_running_time_offset; + + start_running_time = + gst_segment_to_running_time (&aggpad->segment, + GST_FORMAT_TIME, start_time); + end_running_time = + gst_segment_to_running_time (&aggpad->segment, + GST_FORMAT_TIME, end_time); + start_running_time_offset = + gst_util_uint64_scale (start_running_time, rate, GST_SECOND); + end_running_time_offset = + gst_util_uint64_scale (end_running_time, rate, GST_SECOND); + + if (end_running_time_offset < aagg->priv->offset) { + /* Before output segment, drop */ + gst_buffer_unref (inbuf); + pad->priv->buffer = NULL; + pad->priv->position = 0; + pad->priv->size = 0; + pad->priv->output_offset = -1; + GST_DEBUG_OBJECT (pad, + "Buffer before segment or current position: %" G_GUINT64_FORMAT " < %" + G_GUINT64_FORMAT, end_running_time_offset, aagg->priv->offset); + return FALSE; + } + + if (start_running_time_offset < aagg->priv->offset) { + guint diff = aagg->priv->offset - start_running_time_offset; + + pad->priv->position += diff; + if (pad->priv->position >= pad->priv->size) { + /* Empty buffer, drop */ + gst_buffer_unref (inbuf); + pad->priv->buffer = NULL; + pad->priv->position = 0; + pad->priv->size = 0; + pad->priv->output_offset = -1; + GST_DEBUG_OBJECT (pad, + "Buffer before segment or current position: %" G_GUINT64_FORMAT + " < %" G_GUINT64_FORMAT, end_running_time_offset, + aagg->priv->offset); + return FALSE; + } + } + + pad->priv->output_offset = + MAX (start_running_time_offset, aagg->priv->offset); + GST_DEBUG_OBJECT (pad, + "Buffer resynced: Pad offset %" G_GUINT64_FORMAT + ", current audio aggregator offset %" G_GUINT64_FORMAT, + pad->priv->output_offset, aagg->priv->offset); + } + +done: + + GST_LOG_OBJECT (pad, + "Queued new buffer at offset %" G_GUINT64_FORMAT, + pad->priv->output_offset); + pad->priv->buffer = inbuf; + + return TRUE; +} + +/* Called with pad object lock held */ + +static gboolean +gst_audio_aggregator_mix_buffer (GstAudioAggregator * aagg, + GstAudioAggregatorPad * pad, GstBuffer * inbuf, GstBuffer * outbuf) +{ + guint overlap; + guint out_start; + gboolean filled; + guint blocksize; + + blocksize = gst_util_uint64_scale (aagg->priv->output_buffer_duration, + GST_AUDIO_INFO_RATE (&aagg->info), GST_SECOND); + blocksize = MAX (1, blocksize); + + /* Overlap => mix */ + if (aagg->priv->offset < pad->priv->output_offset) + out_start = pad->priv->output_offset - aagg->priv->offset; + else + out_start = 0; + + overlap = pad->priv->size - pad->priv->position; + if (overlap > blocksize - out_start) + overlap = blocksize - out_start; + + if (GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) { + /* skip gap buffer */ + GST_LOG_OBJECT (pad, "skipping GAP buffer"); + pad->priv->output_offset += pad->priv->size; + pad->priv->position = pad->priv->size; + + gst_buffer_replace (&pad->priv->buffer, NULL); + return FALSE; + } + + filled = GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->aggregate_one_buffer (aagg, + pad, inbuf, pad->priv->position, outbuf, out_start, overlap); + + if (filled) + GST_BUFFER_FLAG_UNSET (outbuf, GST_BUFFER_FLAG_GAP); + + pad->priv->position += overlap; + pad->priv->output_offset += overlap; + + if (pad->priv->position == pad->priv->size) { + /* Buffer done, drop it */ + gst_buffer_replace (&pad->priv->buffer, NULL); + GST_DEBUG_OBJECT (pad, "Finished mixing buffer, waiting for next"); + return FALSE; + } + + return TRUE; +} + +static GstBuffer * +gst_audio_aggregator_create_output_buffer (GstAudioAggregator * aagg, + guint num_frames) +{ + GstBuffer *outbuf = gst_buffer_new_and_alloc (num_frames * + GST_AUDIO_INFO_BPF (&aagg->info)); + GstMapInfo outmap; + + gst_buffer_map (outbuf, &outmap, GST_MAP_WRITE); + gst_audio_format_fill_silence (aagg->info.finfo, outmap.data, outmap.size); + gst_buffer_unmap (outbuf, &outmap); + + return outbuf; +} + +static GstFlowReturn +gst_audio_aggregator_aggregate (GstAggregator * agg, gboolean timeout) +{ + /* Get all pads that have data for us and store them in a + * new list. + * + * Calculate the current output offset/timestamp and + * offset_end/timestamp_end. Allocate a silence buffer + * for this and store it. + * + * For all pads: + * 1) Once per input buffer (cached) + * 1) Check discont (flag and timestamp with tolerance) + * 2) If discont or new, resync. That means: + * 1) Drop all start data of the buffer that comes before + * the current position/offset. + * 2) Calculate the offset (output segment!) that the first + * frame of the input buffer corresponds to. Base this on + * the running time. + * + * 2) If the current pad's offset/offset_end overlaps with the output + * offset/offset_end, mix it at the appropiate position in the output + * buffer and advance the pad's position. Remember if this pad needs + * a new buffer to advance behind the output offset_end. + * + * 3) If we had no pad with a buffer, go EOS. + * + * 4) If we had at least one pad that did not advance behind output + * offset_end, let collected be called again for the current + * output offset/offset_end. + */ + GstElement *element; + GstAudioAggregator *aagg; + GList *iter; + GstFlowReturn ret; + GstBuffer *outbuf = NULL; + gint64 next_offset; + gint64 next_timestamp; + gint rate, bpf; + gboolean dropped = FALSE; + gboolean is_eos = TRUE; + gboolean is_done = TRUE; + guint blocksize; + + element = GST_ELEMENT (agg); + aagg = GST_AUDIO_AGGREGATOR (agg); + + blocksize = gst_util_uint64_scale (aagg->priv->output_buffer_duration, + GST_AUDIO_INFO_RATE (&aagg->info), GST_SECOND); + blocksize = MAX (1, blocksize); + + GST_AUDIO_AGGREGATOR_LOCK (aagg); + GST_OBJECT_LOCK (agg); + + /* Update position from the segment start/stop if needed */ + if (agg->segment.position == -1) { + if (agg->segment.rate > 0.0) + agg->segment.position = agg->segment.start; + else + agg->segment.position = agg->segment.stop; + } + + if (G_UNLIKELY (aagg->info.finfo->format == GST_AUDIO_FORMAT_UNKNOWN)) { + if (timeout) { + GST_DEBUG_OBJECT (aagg, + "Got timeout before receiving any caps, don't output anything"); + + /* Advance position */ + if (agg->segment.rate > 0.0) + agg->segment.position += aagg->priv->output_buffer_duration; + else if (agg->segment.position > aagg->priv->output_buffer_duration) + agg->segment.position -= aagg->priv->output_buffer_duration; + else + agg->segment.position = 0; + + GST_OBJECT_UNLOCK (agg); + GST_AUDIO_AGGREGATOR_UNLOCK (aagg); + return GST_FLOW_OK; + } else { + GST_OBJECT_UNLOCK (agg); + goto not_negotiated; + } + } + + if (aagg->priv->send_caps) { + GST_OBJECT_UNLOCK (agg); + gst_aggregator_set_src_caps (agg, aagg->current_caps); + GST_OBJECT_LOCK (agg); + aagg->priv->offset = gst_util_uint64_scale (agg->segment.position, + GST_AUDIO_INFO_RATE (&aagg->info), GST_SECOND); + + aagg->priv->send_caps = FALSE; + } + + + rate = GST_AUDIO_INFO_RATE (&aagg->info); + bpf = GST_AUDIO_INFO_BPF (&aagg->info); + + + /* for the next timestamp, use the sample counter, which will + * never accumulate rounding errors */ + + /* FIXME: Reverse mixing does not work at all yet */ + if (agg->segment.rate > 0.0) { + next_offset = aagg->priv->offset + blocksize; + } else { + next_offset = aagg->priv->offset - blocksize; + } + + next_timestamp = gst_util_uint64_scale (next_offset, GST_SECOND, rate); + + if (aagg->priv->current_buffer == NULL) { + GST_OBJECT_UNLOCK (agg); + aagg->priv->current_buffer = + GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->create_output_buffer (aagg, + blocksize); + /* Be careful, some things could have changed ? */ + GST_OBJECT_LOCK (agg); + GST_BUFFER_FLAG_SET (aagg->priv->current_buffer, GST_BUFFER_FLAG_GAP); + } + outbuf = aagg->priv->current_buffer; + + GST_LOG_OBJECT (agg, + "Starting to mix %u samples for offset %" G_GUINT64_FORMAT + " with timestamp %" GST_TIME_FORMAT, blocksize, + aagg->priv->offset, GST_TIME_ARGS (agg->segment.position)); + + for (iter = element->sinkpads; iter; iter = iter->next) { + GstBuffer *inbuf; + GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) iter->data; + GstAggregatorPad *aggpad = (GstAggregatorPad *) iter->data; + gboolean drop_buf = FALSE; + gboolean pad_eos = gst_aggregator_pad_is_eos (aggpad); + + if (!pad_eos) + is_eos = FALSE; + + inbuf = gst_aggregator_pad_get_buffer (aggpad); + + GST_OBJECT_LOCK (pad); + if (!inbuf) { + if (timeout) { + if (pad->priv->output_offset < next_offset) { + gint64 diff = next_offset - pad->priv->output_offset; + GST_LOG_OBJECT (pad, "Timeout, missing %" G_GINT64_FORMAT " frames (%" + GST_TIME_FORMAT ")", diff, + GST_TIME_ARGS (gst_util_uint64_scale (diff, GST_SECOND, + GST_AUDIO_INFO_RATE (&aagg->info)))); + } + } else if (!pad_eos) { + is_done = FALSE; + } + GST_OBJECT_UNLOCK (pad); + continue; + } + + g_assert (!pad->priv->buffer || pad->priv->buffer == inbuf); + + /* New buffer? */ + if (!pad->priv->buffer) { + /* Takes ownership of buffer */ + if (!gst_audio_aggregator_fill_buffer (aagg, pad, inbuf)) { + dropped = TRUE; + GST_OBJECT_UNLOCK (pad); + gst_aggregator_pad_drop_buffer (aggpad); + continue; + } + } else { + gst_buffer_unref (inbuf); + } + + if (!pad->priv->buffer && !dropped && pad_eos) { + GST_DEBUG_OBJECT (aggpad, "Pad is in EOS state"); + GST_OBJECT_UNLOCK (pad); + continue; + } + + g_assert (pad->priv->buffer); + + /* This pad is lacking behind, we need to update the offset + * and maybe drop the current buffer */ + if (pad->priv->output_offset < aagg->priv->offset) { + gint64 diff = aagg->priv->offset - pad->priv->output_offset; + + if (pad->priv->position + diff > pad->priv->size) + diff = pad->priv->size - pad->priv->position; + pad->priv->position += diff; + pad->priv->output_offset += diff; + + if (pad->priv->position == pad->priv->size) { + /* Buffer done, drop it */ + gst_buffer_replace (&pad->priv->buffer, NULL); + dropped = TRUE; + GST_OBJECT_UNLOCK (pad); + gst_aggregator_pad_drop_buffer (aggpad); + continue; + } + } + + + if (pad->priv->output_offset >= aagg->priv->offset + && pad->priv->output_offset < + aagg->priv->offset + blocksize && pad->priv->buffer) { + GST_LOG_OBJECT (aggpad, "Mixing buffer for current offset"); + drop_buf = !gst_audio_aggregator_mix_buffer (aagg, pad, pad->priv->buffer, + outbuf); + if (pad->priv->output_offset >= next_offset) { + GST_DEBUG_OBJECT (pad, + "Pad is after current offset: %" G_GUINT64_FORMAT " >= %" + G_GUINT64_FORMAT, pad->priv->output_offset, next_offset); + } else { + is_done = FALSE; + } + } + + GST_OBJECT_UNLOCK (pad); + if (drop_buf) + gst_aggregator_pad_drop_buffer (aggpad); + + } + GST_OBJECT_UNLOCK (agg); + + if (dropped) { + /* We dropped a buffer, retry */ + GST_INFO_OBJECT (aagg, "A pad dropped a buffer, wait for the next one"); + GST_AUDIO_AGGREGATOR_UNLOCK (aagg); + return GST_FLOW_OK; + } + + if (!is_done && !is_eos) { + /* Get more buffers */ + GST_INFO_OBJECT (aagg, + "We're not done yet for the current offset," " waiting for more data"); + GST_AUDIO_AGGREGATOR_UNLOCK (aagg); + return GST_FLOW_OK; + } + + if (is_eos) { + gint64 max_offset = 0; + + GST_DEBUG_OBJECT (aagg, "We're EOS"); + + GST_OBJECT_LOCK (agg); + for (iter = GST_ELEMENT (agg)->sinkpads; iter; iter = iter->next) { + GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (iter->data); + + max_offset = MAX ((gint64) max_offset, (gint64) pad->priv->output_offset); + } + GST_OBJECT_UNLOCK (agg); + + /* This means EOS or nothing mixed in at all */ + if (aagg->priv->offset == max_offset) { + gst_buffer_replace (&aagg->priv->current_buffer, NULL); + GST_AUDIO_AGGREGATOR_UNLOCK (aagg); + return GST_FLOW_EOS; + } + + if (max_offset <= next_offset) { + GST_DEBUG_OBJECT (aagg, + "Last buffer is incomplete: %" G_GUINT64_FORMAT " <= %" + G_GUINT64_FORMAT, max_offset, next_offset); + next_offset = max_offset; + next_timestamp = gst_util_uint64_scale (next_offset, GST_SECOND, rate); + + if (next_offset > aagg->priv->offset) + gst_buffer_resize (outbuf, 0, (next_offset - aagg->priv->offset) * bpf); + } + } + + /* set timestamps on the output buffer */ + GST_OBJECT_LOCK (agg); + if (agg->segment.rate > 0.0) { + GST_BUFFER_PTS (outbuf) = agg->segment.position; + GST_BUFFER_OFFSET (outbuf) = aagg->priv->offset; + GST_BUFFER_OFFSET_END (outbuf) = next_offset; + GST_BUFFER_DURATION (outbuf) = next_timestamp - agg->segment.position; + } else { + GST_BUFFER_PTS (outbuf) = next_timestamp; + GST_BUFFER_OFFSET (outbuf) = next_offset; + GST_BUFFER_OFFSET_END (outbuf) = aagg->priv->offset; + GST_BUFFER_DURATION (outbuf) = agg->segment.position - next_timestamp; + } + + aagg->priv->offset = next_offset; + agg->segment.position = next_timestamp; + + GST_OBJECT_UNLOCK (agg); + + /* send it out */ + GST_LOG_OBJECT (aagg, + "pushing outbuf %p, timestamp %" GST_TIME_FORMAT " offset %" + G_GINT64_FORMAT, outbuf, GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)), + GST_BUFFER_OFFSET (outbuf)); + + GST_AUDIO_AGGREGATOR_UNLOCK (aagg); + + ret = gst_aggregator_finish_buffer (agg, aagg->priv->current_buffer); + aagg->priv->current_buffer = NULL; + + GST_LOG_OBJECT (aagg, "pushed outbuf, result = %s", gst_flow_get_name (ret)); + + return ret; + /* ERRORS */ +not_negotiated: + { + GST_AUDIO_AGGREGATOR_UNLOCK (aagg); + GST_ELEMENT_ERROR (aagg, STREAM, FORMAT, (NULL), + ("Unknown data received, not negotiated")); + return GST_FLOW_NOT_NEGOTIATED; + } +} diff --git a/gst/audiomixer/gstaudioaggregator.h b/gst/audiomixer/gstaudioaggregator.h new file mode 100644 index 0000000000..304bad2871 --- /dev/null +++ b/gst/audiomixer/gstaudioaggregator.h @@ -0,0 +1,171 @@ +/* GStreamer + * Copyright (C) 2014 Collabora + * Author: Olivier Crete + * + * gstaudioaggregator.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_AUDIO_AGGREGATOR_H__ +#define __GST_AUDIO_AGGREGATOR_H__ + +#ifndef GST_USE_UNSTABLE_API +#warning "The Base library from gst-plugins-bad is unstable API and may change in future." +#warning "You can define GST_USE_UNSTABLE_API to avoid this warning." +#endif + +#include +#include +#include + +G_BEGIN_DECLS + +/******************************* + * GstAudioAggregator Structs * + *******************************/ + +typedef struct _GstAudioAggregator GstAudioAggregator; +typedef struct _GstAudioAggregatorPrivate GstAudioAggregatorPrivate; +typedef struct _GstAudioAggregatorClass GstAudioAggregatorClass; + + +/************************ + * GstAudioAggregatorPad API * + ***********************/ + +#define GST_TYPE_AUDIO_AGGREGATOR_PAD (gst_audio_aggregator_pad_get_type()) +#define GST_AUDIO_AGGREGATOR_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_AUDIO_AGGREGATOR_PAD, GstAudioAggregatorPad)) +#define GST_AUDIO_AGGREGATOR_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_AUDIO_AGGREGATOR_PAD, GstAudioAggregatorPadClass)) +#define GST_AUDIO_AGGREGATOR_PAD_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_AUDIO_AGGREGATOR_PAD, GstAudioAggregatorPadClass)) +#define GST_IS_AUDIO_AGGREGATOR_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_AUDIO_AGGREGATOR_PAD)) +#define GST_IS_AUDIO_AGGREGATOR_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_AUDIO_AGGREGATOR_PAD)) + +/**************************** + * GstAudioAggregatorPad Structs * + ***************************/ + +typedef struct _GstAudioAggregatorPad GstAudioAggregatorPad; +typedef struct _GstAudioAggregatorPadClass GstAudioAggregatorPadClass; +typedef struct _GstAudioAggregatorPadPrivate GstAudioAggregatorPadPrivate; + +/** + * GstAudioAggregatorPad: + * @parent: The parent #GstAggregatorPad + * @info: The audio info for this pad set from the incoming caps + * + * The implementation the GstPad to use with #GstAudioAggregator + */ +struct _GstAudioAggregatorPad +{ + GstAggregatorPad parent; + + GstAudioInfo info; + + /*< private >*/ + GstAudioAggregatorPadPrivate * priv; + + gpointer _gst_reserved[GST_PADDING]; +}; + +/** + * GstAudioAggregatorPadClass: + * + */ +struct _GstAudioAggregatorPadClass +{ + GstAggregatorPadClass parent_class; + + /*< private >*/ + gpointer _gst_reserved[GST_PADDING]; +}; + +GType gst_audio_aggregator_pad_get_type (void); + +/************************** + * GstAudioAggregator API * + **************************/ + +#define GST_TYPE_AUDIO_AGGREGATOR (gst_audio_aggregator_get_type()) +#define GST_AUDIO_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_AUDIO_AGGREGATOR,GstAudioAggregator)) +#define GST_AUDIO_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_AUDIO_AGGREGATOR,GstAudioAggregatorClass)) +#define GST_AUDIO_AGGREGATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_AUDIO_AGGREGATOR,GstAudioAggregatorClass)) +#define GST_IS_AUDIO_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_AUDIO_AGGREGATOR)) +#define GST_IS_AUDIO_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_AUDIO_AGGREGATOR)) + +#define GST_FLOW_CUSTOM_SUCCESS GST_FLOW_NOT_HANDLED + +/** + * GstAudioAggregator: + * @parent: The parent #GstAggregator + * @info: The information parsed from the current caps + * @current_caps: The caps set by the subclass + * + * GstAudioAggregator object + */ +struct _GstAudioAggregator +{ + GstAggregator parent; + + /* All member are read only for subclasses, must hold OBJECT lock */ + GstAudioInfo info; + + GstCaps *current_caps; + + /*< private >*/ + GstAudioAggregatorPrivate *priv; + + gpointer _gst_reserved[GST_PADDING]; +}; + +/** + * GstAudioAggregatorClass: + * @create_output_buffer: Create a new output buffer contains num_frames frames. + * @aggregate_one_buffer: Aggregates one input buffer to the output + * buffer. The in_offset and out_offset are in "frames", which is + * the size of a sample times the number of channels. Returns TRUE if + * any non-silence was added to the buffer + */ +struct _GstAudioAggregatorClass { + GstAggregatorClass parent_class; + + GstBuffer * (* create_output_buffer) (GstAudioAggregator * aagg, + guint num_frames); + gboolean (* aggregate_one_buffer) (GstAudioAggregator * aagg, + GstAudioAggregatorPad * pad, GstBuffer * inbuf, guint in_offset, + GstBuffer * outbuf, guint out_offset, guint num_frames); + + /*< private >*/ + gpointer _gst_reserved[GST_PADDING]; +}; + +/************************* + * GstAggregator methods * + ************************/ + +GType gst_audio_aggregator_get_type(void); + +void +gst_audio_aggregator_set_sink_caps (GstAudioAggregator * aagg, + GstAudioAggregatorPad * pad, GstCaps * caps); + +gboolean +gst_audio_aggregator_set_src_caps (GstAudioAggregator * aagg, GstCaps * caps); + + +G_END_DECLS + +#endif /* __GST_AUDIO_AGGREGATOR_H__ */ diff --git a/gst/audiomixer/gstaudiomixer.c b/gst/audiomixer/gstaudiomixer.c index d1daff6526..3ff370690c 100644 --- a/gst/audiomixer/gstaudiomixer.c +++ b/gst/audiomixer/gstaudiomixer.c @@ -86,7 +86,8 @@ enum PROP_PAD_MUTE }; -G_DEFINE_TYPE (GstAudioMixerPad, gst_audiomixer_pad, GST_TYPE_AGGREGATOR_PAD); +G_DEFINE_TYPE (GstAudioMixerPad, gst_audiomixer_pad, + GST_TYPE_AUDIO_AGGREGATOR_PAD); static void gst_audiomixer_pad_get_property (GObject * object, guint prop_id, @@ -133,27 +134,10 @@ gst_audiomixer_pad_set_property (GObject * object, guint prop_id, } } -static gboolean -gst_audiomixer_pad_flush_pad (GstAggregatorPad * aggpad, - GstAggregator * aggregator) -{ - GstAudioMixerPad *pad = GST_AUDIO_MIXER_PAD (aggpad); - - GST_OBJECT_LOCK (aggpad); - pad->position = pad->size = 0; - pad->output_offset = pad->next_offset = -1; - pad->discont_time = GST_CLOCK_TIME_NONE; - gst_buffer_replace (&pad->buffer, NULL); - GST_OBJECT_UNLOCK (aggpad); - - return TRUE; -} - static void gst_audiomixer_pad_class_init (GstAudioMixerPadClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; - GstAggregatorPadClass *aggpadclass = (GstAggregatorPadClass *) klass; gobject_class->set_property = gst_audiomixer_pad_set_property; gobject_class->get_property = gst_audiomixer_pad_get_property; @@ -166,8 +150,6 @@ gst_audiomixer_pad_class_init (GstAudioMixerPadClass * klass) g_param_spec_boolean ("mute", "Mute", "Mute this pad", DEFAULT_PAD_MUTE, G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS)); - - aggpadclass->flush = GST_DEBUG_FUNCPTR (gst_audiomixer_pad_flush_pad); } static void @@ -175,26 +157,12 @@ gst_audiomixer_pad_init (GstAudioMixerPad * pad) { pad->volume = DEFAULT_PAD_VOLUME; pad->mute = DEFAULT_PAD_MUTE; - - pad->buffer = NULL; - pad->position = 0; - pad->size = 0; - pad->output_offset = -1; - pad->next_offset = -1; - pad->discont_time = GST_CLOCK_TIME_NONE; } -#define DEFAULT_ALIGNMENT_THRESHOLD (40 * GST_MSECOND) -#define DEFAULT_DISCONT_WAIT (1 * GST_SECOND) -#define DEFAULT_OUTPUT_BUFFER_DURATION (10 * GST_MSECOND) - enum { PROP_0, - PROP_FILTER_CAPS, - PROP_ALIGNMENT_THRESHOLD, - PROP_DISCONT_WAIT, - PROP_OUTPUT_BUFFER_DURATION + PROP_FILTER_CAPS }; /* elementfactory information */ @@ -227,8 +195,8 @@ static void gst_audiomixer_child_proxy_init (gpointer g_iface, gpointer iface_data); #define gst_audiomixer_parent_class parent_class -G_DEFINE_TYPE_WITH_CODE (GstAudioMixer, gst_audiomixer, GST_TYPE_AGGREGATOR, - G_IMPLEMENT_INTERFACE (GST_TYPE_CHILD_PROXY, +G_DEFINE_TYPE_WITH_CODE (GstAudioMixer, gst_audiomixer, + GST_TYPE_AUDIO_AGGREGATOR, G_IMPLEMENT_INTERFACE (GST_TYPE_CHILD_PROXY, gst_audiomixer_child_proxy_init)); static void gst_audiomixer_dispose (GObject * object); @@ -243,35 +211,27 @@ static GstPad *gst_audiomixer_request_new_pad (GstElement * element, GstPadTemplate * temp, const gchar * req_name, const GstCaps * caps); static void gst_audiomixer_release_pad (GstElement * element, GstPad * pad); -static GstFlowReturn -gst_audiomixer_do_clip (GstAggregator * agg, - GstAggregatorPad * bpad, GstBuffer * buffer, GstBuffer ** outbuf); -static GstFlowReturn gst_audiomixer_aggregate (GstAggregator * agg, - gboolean timeout); +static gboolean +gst_audiomixer_aggregate_one_buffer (GstAudioAggregator * aagg, + GstAudioAggregatorPad * aaggpad, GstBuffer * inbuf, guint in_offset, + GstBuffer * outbuf, guint out_offset, guint num_samples); -static GstClockTime -gst_audiomixer_get_next_time (GstAggregator * agg) -{ - if (agg->segment.position == -1) - return agg->segment.start; - else - return agg->segment.position; -} /* we can only accept caps that we and downstream can handle. * if we have filtercaps set, use those to constrain the target caps. */ static GstCaps * -gst_audiomixer_sink_getcaps (GstPad * pad, GstCaps * filter) +gst_audiomixer_sink_getcaps (GstAggregator * agg, GstPad * pad, + GstCaps * filter) { - GstAggregator *agg; + GstAudioAggregator *aagg; GstAudioMixer *audiomixer; GstCaps *result, *peercaps, *current_caps, *filter_caps; GstStructure *s; gint i, n; - audiomixer = GST_AUDIO_MIXER (GST_PAD_PARENT (pad)); - agg = GST_AGGREGATOR (audiomixer); + audiomixer = GST_AUDIO_MIXER (agg); + aagg = GST_AUDIO_AGGREGATOR (agg); GST_OBJECT_LOCK (audiomixer); /* take filter */ @@ -297,8 +257,7 @@ gst_audiomixer_sink_getcaps (GstPad * pad, GstCaps * filter) /* get the allowed caps on this sinkpad */ GST_OBJECT_LOCK (audiomixer); - current_caps = - audiomixer->current_caps ? gst_caps_ref (audiomixer->current_caps) : NULL; + current_caps = aagg->current_caps ? gst_caps_ref (aagg->current_caps) : NULL; if (current_caps == NULL) { current_caps = gst_pad_get_pad_template_caps (pad); if (!current_caps) @@ -367,7 +326,7 @@ gst_audiomixer_sink_query (GstAggregator * agg, GstAggregatorPad * aggpad, GstCaps *filter, *caps; gst_query_parse_caps (query, &filter); - caps = gst_audiomixer_sink_getcaps (GST_PAD (aggpad), filter); + caps = gst_audiomixer_sink_getcaps (agg, GST_PAD (aggpad), filter); gst_query_set_caps_result (query, caps); gst_caps_unref (caps); res = TRUE; @@ -389,10 +348,12 @@ static gboolean gst_audiomixer_setcaps (GstAudioMixer * audiomixer, GstPad * pad, GstCaps * orig_caps) { + GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (audiomixer); GstCaps *caps; GstAudioInfo info; GstStructure *s; gint channels; + gboolean ret; caps = gst_caps_copy (orig_caps); @@ -409,34 +370,35 @@ gst_audiomixer_setcaps (GstAudioMixer * audiomixer, GstPad * pad, * different upstream threads doing query_caps + accept_caps + sending * (possibly different) CAPS events, but there's not much we can do about * that, upstream needs to deal with it. */ - if (audiomixer->current_caps != NULL) { - if (gst_audio_info_is_equal (&info, &audiomixer->info)) { + if (aagg->current_caps != NULL) { + if (gst_audio_info_is_equal (&info, &aagg->info)) { GST_OBJECT_UNLOCK (audiomixer); gst_caps_unref (caps); + gst_audio_aggregator_set_sink_caps (aagg, GST_AUDIO_AGGREGATOR_PAD (pad), + orig_caps); return TRUE; } else { GST_DEBUG_OBJECT (pad, "got input caps %" GST_PTR_FORMAT ", but " - "current caps are %" GST_PTR_FORMAT, caps, audiomixer->current_caps); + "current caps are %" GST_PTR_FORMAT, caps, aagg->current_caps); GST_OBJECT_UNLOCK (audiomixer); gst_pad_push_event (pad, gst_event_new_reconfigure ()); gst_caps_unref (caps); return FALSE; } } - - GST_INFO_OBJECT (pad, "setting caps to %" GST_PTR_FORMAT, caps); - gst_caps_replace (&audiomixer->current_caps, caps); - - memcpy (&audiomixer->info, &info, sizeof (info)); - audiomixer->send_caps = TRUE; GST_OBJECT_UNLOCK (audiomixer); - /* send caps event later, after stream-start event */ + + ret = gst_audio_aggregator_set_src_caps (aagg, caps); + + if (ret) + gst_audio_aggregator_set_sink_caps (aagg, GST_AUDIO_AGGREGATOR_PAD (pad), + orig_caps); GST_INFO_OBJECT (pad, "handle caps change to %" GST_PTR_FORMAT, caps); gst_caps_unref (caps); - return TRUE; + return ret; /* ERRORS */ invalid_format: @@ -447,211 +409,6 @@ invalid_format: } } -/* FIXME, the duration query should reflect how long you will produce - * data, that is the amount of stream time until you will emit EOS. - * - * For synchronized mixing this is always the max of all the durations - * of upstream since we emit EOS when all of them finished. - * - * We don't do synchronized mixing so this really depends on where the - * streams where punched in and what their relative offsets are against - * eachother which we can get from the first timestamps we see. - * - * When we add a new stream (or remove a stream) the duration might - * also become invalid again and we need to post a new DURATION - * message to notify this fact to the parent. - * For now we take the max of all the upstream elements so the simple - * cases work at least somewhat. - */ -static gboolean -gst_audiomixer_query_duration (GstAudioMixer * audiomixer, GstQuery * query) -{ - gint64 max; - gboolean res; - GstFormat format; - GstIterator *it; - gboolean done; - GValue item = { 0, }; - - /* parse format */ - gst_query_parse_duration (query, &format, NULL); - - max = -1; - res = TRUE; - done = FALSE; - - it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (audiomixer)); - while (!done) { - GstIteratorResult ires; - - ires = gst_iterator_next (it, &item); - switch (ires) { - case GST_ITERATOR_DONE: - done = TRUE; - break; - case GST_ITERATOR_OK: - { - GstPad *pad = g_value_get_object (&item); - gint64 duration; - - /* ask sink peer for duration */ - res &= gst_pad_peer_query_duration (pad, format, &duration); - /* take max from all valid return values */ - if (res) { - /* valid unknown length, stop searching */ - if (duration == -1) { - max = duration; - done = TRUE; - } - /* else see if bigger than current max */ - else if (duration > max) - max = duration; - } - g_value_reset (&item); - break; - } - case GST_ITERATOR_RESYNC: - max = -1; - res = TRUE; - gst_iterator_resync (it); - break; - default: - res = FALSE; - done = TRUE; - break; - } - } - g_value_unset (&item); - gst_iterator_free (it); - - if (res) { - /* and store the max */ - GST_DEBUG_OBJECT (audiomixer, "Total duration in format %s: %" - GST_TIME_FORMAT, gst_format_get_name (format), GST_TIME_ARGS (max)); - gst_query_set_duration (query, format, max); - } - - return res; -} - -static gboolean -gst_audiomixer_src_query (GstAggregator * agg, GstQuery * query) -{ - GstAudioMixer *audiomixer = GST_AUDIO_MIXER (agg); - gboolean res = FALSE; - - switch (GST_QUERY_TYPE (query)) { - case GST_QUERY_POSITION: - { - GstFormat format; - - gst_query_parse_position (query, &format, NULL); - - switch (format) { - case GST_FORMAT_TIME: - /* FIXME, bring to stream time, might be tricky */ - gst_query_set_position (query, format, agg->segment.position); - res = TRUE; - break; - case GST_FORMAT_DEFAULT: - gst_query_set_position (query, format, audiomixer->offset); - res = TRUE; - break; - default: - break; - } - break; - } - case GST_QUERY_DURATION: - res = gst_audiomixer_query_duration (audiomixer, query); - break; - default: - res = - GST_AGGREGATOR_CLASS (gst_audiomixer_parent_class)->src_query - (agg, query); - break; - } - - return res; -} - -/* event handling */ - -typedef struct -{ - GstEvent *event; - gboolean flush; -} EventData; - -static gboolean -gst_audiomixer_src_event (GstAggregator * agg, GstEvent * event) -{ - gboolean result; - - GstAudioMixer *audiomixer = GST_AUDIO_MIXER (agg); - GST_DEBUG_OBJECT (agg->srcpad, "Got %s event on src pad", - GST_EVENT_TYPE_NAME (event)); - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_QOS: - /* QoS might be tricky */ - gst_event_unref (event); - return FALSE; - case GST_EVENT_NAVIGATION: - /* navigation is rather pointless. */ - gst_event_unref (event); - return FALSE; - break; - case GST_EVENT_SEEK: - { - GstSeekFlags flags; - gdouble rate; - GstSeekType start_type, stop_type; - gint64 start, stop; - GstFormat seek_format, dest_format; - - /* parse the seek parameters */ - gst_event_parse_seek (event, &rate, &seek_format, &flags, &start_type, - &start, &stop_type, &stop); - - /* Check the seeking parametters before linking up */ - if ((start_type != GST_SEEK_TYPE_NONE) - && (start_type != GST_SEEK_TYPE_SET)) { - result = FALSE; - GST_DEBUG_OBJECT (audiomixer, - "seeking failed, unhandled seek type for start: %d", start_type); - goto done; - } - if ((stop_type != GST_SEEK_TYPE_NONE) && (stop_type != GST_SEEK_TYPE_SET)) { - result = FALSE; - GST_DEBUG_OBJECT (audiomixer, - "seeking failed, unhandled seek type for end: %d", stop_type); - goto done; - } - - dest_format = agg->segment.format; - if (seek_format != dest_format) { - result = FALSE; - GST_DEBUG_OBJECT (audiomixer, - "seeking failed, unhandled seek format: %d", seek_format); - goto done; - } - - /* Link up */ - result = GST_AGGREGATOR_CLASS (parent_class)->src_event (agg, event); - goto done; - } - break; - default: - break; - } - - return GST_AGGREGATOR_CLASS (parent_class)->src_event (agg, event); - -done: - return result; -} - static gboolean gst_audiomixer_sink_event (GstAggregator * agg, GstAggregatorPad * aggpad, GstEvent * event) @@ -673,34 +430,6 @@ gst_audiomixer_sink_event (GstAggregator * agg, GstAggregatorPad * aggpad, event = NULL; break; } - case GST_EVENT_SEGMENT: - { - const GstSegment *segment; - - gst_event_parse_segment (event, &segment); - if (segment->rate != agg->segment.rate) { - GST_ERROR_OBJECT (aggpad, - "Got segment event with wrong rate %lf, expected %lf", - segment->rate, agg->segment.rate); - res = FALSE; - gst_event_unref (event); - event = NULL; - } else if (segment->rate < 0.0) { - GST_ERROR_OBJECT (aggpad, "Negative rates not supported yet"); - res = FALSE; - gst_event_unref (event); - event = NULL; - } else { - GstAudioMixerPad *pad = GST_AUDIO_MIXER_PAD (aggpad); - - /* Ideally, this should only be set when the new segment causes running - * times to change, and hence needs discont calculation in fill_buffer */ - GST_OBJECT_LOCK (pad); - pad->new_segment = TRUE; - GST_OBJECT_UNLOCK (pad); - } - break; - } default: break; } @@ -711,57 +440,13 @@ gst_audiomixer_sink_event (GstAggregator * agg, GstAggregatorPad * aggpad, return res; } -static void -gst_audiomixer_reset (GstAudioMixer * audiomixer) -{ - GstAggregator *agg = GST_AGGREGATOR (audiomixer); - - audiomixer->offset = 0; - agg->segment.position = -1; - - gst_audio_info_init (&audiomixer->info); - gst_caps_replace (&audiomixer->current_caps, NULL); - gst_buffer_replace (&audiomixer->current_buffer, NULL); -} - -static gboolean -gst_audiomixer_start (GstAggregator * agg) -{ - GstAudioMixer *audiomixer = GST_AUDIO_MIXER (agg); - - gst_audiomixer_reset (audiomixer); - - return TRUE; -} - -static gboolean -gst_audiomixer_stop (GstAggregator * agg) -{ - GstAudioMixer *audiomixer = GST_AUDIO_MIXER (agg); - - gst_audiomixer_reset (audiomixer); - - return TRUE; -} - -static GstFlowReturn -gst_audiomixer_flush (GstAggregator * agg) -{ - GstAudioMixer *audiomixer = GST_AUDIO_MIXER (agg); - - audiomixer->offset = 0; - agg->segment.position = -1; - gst_buffer_replace (&audiomixer->current_buffer, NULL); - - return GST_FLOW_OK; -} - static void gst_audiomixer_class_init (GstAudioMixerClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; GstElementClass *gstelement_class = (GstElementClass *) klass; GstAggregatorClass *agg_class = (GstAggregatorClass *) klass; + GstAudioAggregatorClass *aagg_class = (GstAudioAggregatorClass *) klass; gobject_class->set_property = gst_audiomixer_set_property; gobject_class->get_property = gst_audiomixer_get_property; @@ -773,25 +458,6 @@ gst_audiomixer_class_init (GstAudioMixerClass * klass) "Setting this property takes a reference to the supplied GstCaps " "object", GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD, - g_param_spec_uint64 ("alignment-threshold", "Alignment Threshold", - "Timestamp alignment threshold in nanoseconds", 0, - G_MAXUINT64 - 1, DEFAULT_ALIGNMENT_THRESHOLD, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_DISCONT_WAIT, - g_param_spec_uint64 ("discont-wait", "Discont Wait", - "Window of time in nanoseconds to wait before " - "creating a discontinuity", 0, - G_MAXUINT64 - 1, DEFAULT_DISCONT_WAIT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_OUTPUT_BUFFER_DURATION, - g_param_spec_uint64 ("output-buffer-duration", "Output Buffer Duration", - "Output block size in nanoseconds", 1, - G_MAXUINT64, DEFAULT_OUTPUT_BUFFER_DURATION, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&gst_audiomixer_src_template)); gst_element_class_add_pad_template (gstelement_class, @@ -807,35 +473,17 @@ gst_audiomixer_class_init (GstAudioMixerClass * klass) GST_DEBUG_FUNCPTR (gst_audiomixer_release_pad); agg_class->sinkpads_type = GST_TYPE_AUDIO_MIXER_PAD; - agg_class->start = gst_audiomixer_start; - agg_class->stop = gst_audiomixer_stop; - - agg_class->get_next_time = gst_audiomixer_get_next_time; agg_class->sink_query = GST_DEBUG_FUNCPTR (gst_audiomixer_sink_query); agg_class->sink_event = GST_DEBUG_FUNCPTR (gst_audiomixer_sink_event); - agg_class->aggregate = GST_DEBUG_FUNCPTR (gst_audiomixer_aggregate); - agg_class->clip = GST_DEBUG_FUNCPTR (gst_audiomixer_do_clip); - - agg_class->src_event = GST_DEBUG_FUNCPTR (gst_audiomixer_src_event); - agg_class->src_query = GST_DEBUG_FUNCPTR (gst_audiomixer_src_query); - - agg_class->flush = GST_DEBUG_FUNCPTR (gst_audiomixer_flush); + aagg_class->aggregate_one_buffer = gst_audiomixer_aggregate_one_buffer; } static void gst_audiomixer_init (GstAudioMixer * audiomixer) { - audiomixer->current_caps = NULL; - gst_audio_info_init (&audiomixer->info); - audiomixer->filter_caps = NULL; - audiomixer->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD; - audiomixer->discont_wait = DEFAULT_DISCONT_WAIT; - audiomixer->output_buffer_duration = DEFAULT_OUTPUT_BUFFER_DURATION; - gst_aggregator_set_latency (GST_AGGREGATOR (audiomixer), - audiomixer->output_buffer_duration, audiomixer->output_buffer_duration); } static void @@ -844,7 +492,6 @@ gst_audiomixer_dispose (GObject * object) GstAudioMixer *audiomixer = GST_AUDIO_MIXER (object); gst_caps_replace (&audiomixer->filter_caps, NULL); - gst_caps_replace (&audiomixer->current_caps, NULL); G_OBJECT_CLASS (parent_class)->dispose (object); } @@ -877,18 +524,6 @@ gst_audiomixer_set_property (GObject * object, guint prop_id, GST_DEBUG_OBJECT (audiomixer, "set new caps %" GST_PTR_FORMAT, new_caps); break; } - case PROP_ALIGNMENT_THRESHOLD: - audiomixer->alignment_threshold = g_value_get_uint64 (value); - break; - case PROP_DISCONT_WAIT: - audiomixer->discont_wait = g_value_get_uint64 (value); - break; - case PROP_OUTPUT_BUFFER_DURATION: - audiomixer->output_buffer_duration = g_value_get_uint64 (value); - gst_aggregator_set_latency (GST_AGGREGATOR (audiomixer), - audiomixer->output_buffer_duration, - audiomixer->output_buffer_duration); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -907,15 +542,6 @@ gst_audiomixer_get_property (GObject * object, guint prop_id, GValue * value, gst_value_set_caps (value, audiomixer->filter_caps); GST_OBJECT_UNLOCK (audiomixer); break; - case PROP_ALIGNMENT_THRESHOLD: - g_value_set_uint64 (value, audiomixer->alignment_threshold); - break; - case PROP_DISCONT_WAIT: - g_value_set_uint64 (value, audiomixer->discont_wait); - break; - case PROP_OUTPUT_BUFFER_DURATION: - g_value_set_uint64 (value, audiomixer->output_buffer_duration); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -962,332 +588,118 @@ gst_audiomixer_release_pad (GstElement * element, GstPad * pad) GST_ELEMENT_CLASS (parent_class)->release_pad (element, pad); } -static GstFlowReturn -gst_audiomixer_do_clip (GstAggregator * agg, - GstAggregatorPad * bpad, GstBuffer * buffer, GstBuffer ** out) -{ - GstAudioMixer *audiomixer = GST_AUDIO_MIXER (agg); - gint rate, bpf; - - rate = GST_AUDIO_INFO_RATE (&audiomixer->info); - bpf = GST_AUDIO_INFO_BPF (&audiomixer->info); - - buffer = gst_audio_buffer_clip (buffer, &bpad->segment, rate, bpf); - - *out = buffer; - return GST_FLOW_OK; -} +/* Called with object lock and pad object lock held */ static gboolean -gst_audio_mixer_fill_buffer (GstAudioMixer * audiomixer, GstAudioMixerPad * pad, - GstBuffer * inbuf) +gst_audiomixer_aggregate_one_buffer (GstAudioAggregator * aagg, + GstAudioAggregatorPad * aaggpad, GstBuffer * inbuf, guint in_offset, + GstBuffer * outbuf, guint out_offset, guint num_frames) { - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstClockTime start_time, end_time; - gboolean discont = FALSE; - guint64 start_offset, end_offset; - GstClockTime timestamp, stream_time; - gint rate, bpf; - - g_assert (pad->buffer == NULL); - - rate = GST_AUDIO_INFO_RATE (&audiomixer->info); - bpf = GST_AUDIO_INFO_BPF (&audiomixer->info); - - timestamp = GST_BUFFER_PTS (inbuf); - stream_time = gst_segment_to_stream_time (&aggpad->segment, GST_FORMAT_TIME, - timestamp); - - /* sync object properties on stream time */ - /* TODO: Ideally we would want to do that on every sample */ - if (GST_CLOCK_TIME_IS_VALID (stream_time)) - gst_object_sync_values (GST_OBJECT (pad), stream_time); - - GST_OBJECT_LOCK (pad); - pad->position = 0; - pad->size = gst_buffer_get_size (inbuf); - - start_time = GST_BUFFER_PTS (inbuf); - end_time = - start_time + gst_util_uint64_scale_ceil (pad->size / bpf, - GST_SECOND, rate); - - start_offset = gst_util_uint64_scale (start_time, rate, GST_SECOND); - end_offset = start_offset + pad->size / bpf; - - if (GST_BUFFER_IS_DISCONT (inbuf) - || GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_RESYNC) - || pad->new_segment || pad->next_offset == -1) { - discont = TRUE; - pad->new_segment = FALSE; - } else { - guint64 diff, max_sample_diff; - - /* Check discont, based on audiobasesink */ - if (start_offset <= pad->next_offset) - diff = pad->next_offset - start_offset; - else - diff = start_offset - pad->next_offset; - - max_sample_diff = - gst_util_uint64_scale_int (audiomixer->alignment_threshold, rate, - GST_SECOND); - - /* Discont! */ - if (G_UNLIKELY (diff >= max_sample_diff)) { - if (audiomixer->discont_wait > 0) { - if (pad->discont_time == GST_CLOCK_TIME_NONE) { - pad->discont_time = start_time; - } else if (start_time - pad->discont_time >= audiomixer->discont_wait) { - discont = TRUE; - pad->discont_time = GST_CLOCK_TIME_NONE; - } - } else { - discont = TRUE; - } - } else if (G_UNLIKELY (pad->discont_time != GST_CLOCK_TIME_NONE)) { - /* we have had a discont, but are now back on track! */ - pad->discont_time = GST_CLOCK_TIME_NONE; - } - } - - if (discont) { - /* Have discont, need resync */ - if (pad->next_offset != -1) - GST_INFO_OBJECT (pad, "Have discont. Expected %" - G_GUINT64_FORMAT ", got %" G_GUINT64_FORMAT, - pad->next_offset, start_offset); - pad->output_offset = -1; - pad->next_offset = end_offset; - } else { - pad->next_offset += pad->size / bpf; - } - - if (pad->output_offset == -1) { - GstClockTime start_running_time; - GstClockTime end_running_time; - guint64 start_running_time_offset; - guint64 end_running_time_offset; - - start_running_time = - gst_segment_to_running_time (&aggpad->segment, - GST_FORMAT_TIME, start_time); - end_running_time = - gst_segment_to_running_time (&aggpad->segment, - GST_FORMAT_TIME, end_time); - start_running_time_offset = - gst_util_uint64_scale (start_running_time, rate, GST_SECOND); - end_running_time_offset = - gst_util_uint64_scale (end_running_time, rate, GST_SECOND); - - if (end_running_time_offset < audiomixer->offset) { - /* Before output segment, drop */ - gst_buffer_unref (inbuf); - pad->buffer = NULL; - gst_aggregator_pad_drop_buffer (aggpad); - pad->position = 0; - pad->size = 0; - pad->output_offset = -1; - GST_DEBUG_OBJECT (pad, - "Buffer before segment or current position: %" G_GUINT64_FORMAT " < %" - G_GUINT64_FORMAT, end_running_time_offset, audiomixer->offset); - GST_OBJECT_UNLOCK (pad); - return FALSE; - } - - if (start_running_time_offset < audiomixer->offset) { - guint diff = (audiomixer->offset - start_running_time_offset) * bpf; - - pad->position += diff; - if (pad->position >= pad->size) { - /* Empty buffer, drop */ - gst_buffer_unref (inbuf); - pad->buffer = NULL; - gst_aggregator_pad_drop_buffer (aggpad); - pad->position = 0; - pad->size = 0; - pad->output_offset = -1; - GST_DEBUG_OBJECT (pad, - "Buffer before segment or current position: %" G_GUINT64_FORMAT - " < %" G_GUINT64_FORMAT, end_running_time_offset, - audiomixer->offset); - GST_OBJECT_UNLOCK (pad); - return FALSE; - } - } - - pad->output_offset = MAX (start_running_time_offset, audiomixer->offset); - GST_DEBUG_OBJECT (pad, - "Buffer resynced: Pad offset %" G_GUINT64_FORMAT - ", current mixer offset %" G_GUINT64_FORMAT, pad->output_offset, - audiomixer->offset); - } - - GST_LOG_OBJECT (pad, - "Queued new buffer at offset %" G_GUINT64_FORMAT, pad->output_offset); - pad->buffer = inbuf; - - GST_OBJECT_UNLOCK (pad); - return TRUE; -} - -static void -gst_audio_mixer_mix_buffer (GstAudioMixer * audiomixer, GstAudioMixerPad * pad, - GstMapInfo * outmap) -{ - guint overlap; - guint out_start; - GstBuffer *inbuf; + GstAudioMixerPad *pad = GST_AUDIO_MIXER_PAD (aaggpad); GstMapInfo inmap; + GstMapInfo outmap; gint bpf; - guint blocksize; - gboolean drop_buf = FALSE; - - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - - blocksize = - gst_util_uint64_scale (audiomixer->output_buffer_duration, - GST_AUDIO_INFO_RATE (&audiomixer->info), GST_SECOND); - blocksize = MAX (1, blocksize); - - bpf = GST_AUDIO_INFO_BPF (&audiomixer->info); - - GST_OBJECT_LOCK (pad); - /* Overlap => mix */ - if (audiomixer->offset < pad->output_offset) - out_start = pad->output_offset - audiomixer->offset; - else - out_start = 0; - - overlap = pad->size / bpf - pad->position / bpf; - if (overlap > blocksize - out_start) - overlap = blocksize - out_start; - - inbuf = gst_aggregator_pad_get_buffer (aggpad); - if (inbuf == NULL) { - GST_OBJECT_UNLOCK (pad); - return; - } if (pad->mute || pad->volume < G_MINDOUBLE) { GST_DEBUG_OBJECT (pad, "Skipping muted pad"); - gst_buffer_unref (inbuf); - pad->position += overlap * bpf; - pad->output_offset += overlap; - if (pad->position >= pad->size) { - /* Buffer done, drop it */ - gst_buffer_replace (&pad->buffer, NULL); - drop_buf = TRUE; - } - GST_OBJECT_UNLOCK (pad); - if (drop_buf) - gst_aggregator_pad_drop_buffer (aggpad); - return; + return FALSE; } - if (GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) { - /* skip gap buffer */ - GST_LOG_OBJECT (pad, "skipping GAP buffer"); - gst_buffer_unref (inbuf); - pad->output_offset += pad->size / bpf; - /* Buffer done, drop it */ - gst_buffer_replace (&pad->buffer, NULL); - GST_OBJECT_UNLOCK (pad); - gst_aggregator_pad_drop_buffer (aggpad); - return; - } + bpf = GST_AUDIO_INFO_BPF (&aagg->info); + gst_buffer_map (outbuf, &outmap, GST_MAP_READWRITE); gst_buffer_map (inbuf, &inmap, GST_MAP_READ); GST_LOG_OBJECT (pad, "mixing %u bytes at offset %u from offset %u", - overlap * bpf, out_start * bpf, pad->position); + num_frames * bpf, out_offset * bpf, in_offset * bpf); + /* further buffers, need to add them */ if (pad->volume == 1.0) { - switch (audiomixer->info.finfo->format) { + switch (aagg->info.finfo->format) { case GST_AUDIO_FORMAT_U8: - audiomixer_orc_add_u8 ((gpointer) (outmap->data + out_start * bpf), - (gpointer) (inmap.data + pad->position), - overlap * audiomixer->info.channels); + audiomixer_orc_add_u8 ((gpointer) (outmap.data + out_offset * bpf), + (gpointer) (inmap.data + in_offset * bpf), + num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_S8: - audiomixer_orc_add_s8 ((gpointer) (outmap->data + out_start * bpf), - (gpointer) (inmap.data + pad->position), - overlap * audiomixer->info.channels); + audiomixer_orc_add_s8 ((gpointer) (outmap.data + out_offset * bpf), + (gpointer) (inmap.data + in_offset * bpf), + num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_U16: - audiomixer_orc_add_u16 ((gpointer) (outmap->data + out_start * bpf), - (gpointer) (inmap.data + pad->position), - overlap * audiomixer->info.channels); + audiomixer_orc_add_u16 ((gpointer) (outmap.data + out_offset * bpf), + (gpointer) (inmap.data + in_offset * bpf), + num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_S16: - audiomixer_orc_add_s16 ((gpointer) (outmap->data + out_start * bpf), - (gpointer) (inmap.data + pad->position), - overlap * audiomixer->info.channels); + audiomixer_orc_add_s16 ((gpointer) (outmap.data + out_offset * bpf), + (gpointer) (inmap.data + in_offset * bpf), + num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_U32: - audiomixer_orc_add_u32 ((gpointer) (outmap->data + out_start * bpf), - (gpointer) (inmap.data + pad->position), - overlap * audiomixer->info.channels); + audiomixer_orc_add_u32 ((gpointer) (outmap.data + out_offset * bpf), + (gpointer) (inmap.data + in_offset * bpf), + num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_S32: - audiomixer_orc_add_s32 ((gpointer) (outmap->data + out_start * bpf), - (gpointer) (inmap.data + pad->position), - overlap * audiomixer->info.channels); + audiomixer_orc_add_s32 ((gpointer) (outmap.data + out_offset * bpf), + (gpointer) (inmap.data + in_offset * bpf), + num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_F32: - audiomixer_orc_add_f32 ((gpointer) (outmap->data + out_start * bpf), - (gpointer) (inmap.data + pad->position), - overlap * audiomixer->info.channels); + audiomixer_orc_add_f32 ((gpointer) (outmap.data + out_offset * bpf), + (gpointer) (inmap.data + in_offset * bpf), + num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_F64: - audiomixer_orc_add_f64 ((gpointer) (outmap->data + out_start * bpf), - (gpointer) (inmap.data + pad->position), - overlap * audiomixer->info.channels); + audiomixer_orc_add_f64 ((gpointer) (outmap.data + out_offset * bpf), + (gpointer) (inmap.data + in_offset * bpf), + num_frames * aagg->info.channels); break; default: g_assert_not_reached (); break; } } else { - switch (audiomixer->info.finfo->format) { + switch (aagg->info.finfo->format) { case GST_AUDIO_FORMAT_U8: - audiomixer_orc_add_volume_u8 ((gpointer) (outmap->data + - out_start * bpf), (gpointer) (inmap.data + pad->position), - pad->volume_i8, overlap * audiomixer->info.channels); + audiomixer_orc_add_volume_u8 ((gpointer) (outmap.data + + out_offset * bpf), (gpointer) (inmap.data + in_offset * bpf), + pad->volume_i8, num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_S8: - audiomixer_orc_add_volume_s8 ((gpointer) (outmap->data + - out_start * bpf), (gpointer) (inmap.data + pad->position), - pad->volume_i8, overlap * audiomixer->info.channels); + audiomixer_orc_add_volume_s8 ((gpointer) (outmap.data + + out_offset * bpf), (gpointer) (inmap.data + in_offset * bpf), + pad->volume_i8, num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_U16: - audiomixer_orc_add_volume_u16 ((gpointer) (outmap->data + - out_start * bpf), (gpointer) (inmap.data + pad->position), - pad->volume_i16, overlap * audiomixer->info.channels); + audiomixer_orc_add_volume_u16 ((gpointer) (outmap.data + + out_offset * bpf), (gpointer) (inmap.data + in_offset * bpf), + pad->volume_i16, num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_S16: - audiomixer_orc_add_volume_s16 ((gpointer) (outmap->data + - out_start * bpf), (gpointer) (inmap.data + pad->position), - pad->volume_i16, overlap * audiomixer->info.channels); + audiomixer_orc_add_volume_s16 ((gpointer) (outmap.data + + out_offset * bpf), (gpointer) (inmap.data + in_offset * bpf), + pad->volume_i16, num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_U32: - audiomixer_orc_add_volume_u32 ((gpointer) (outmap->data + - out_start * bpf), (gpointer) (inmap.data + pad->position), - pad->volume_i32, overlap * audiomixer->info.channels); + audiomixer_orc_add_volume_u32 ((gpointer) (outmap.data + + out_offset * bpf), (gpointer) (inmap.data + in_offset * bpf), + pad->volume_i32, num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_S32: - audiomixer_orc_add_volume_s32 ((gpointer) (outmap->data + - out_start * bpf), (gpointer) (inmap.data + pad->position), - pad->volume_i32, overlap * audiomixer->info.channels); + audiomixer_orc_add_volume_s32 ((gpointer) (outmap.data + + out_offset * bpf), (gpointer) (inmap.data + in_offset * bpf), + pad->volume_i32, num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_F32: - audiomixer_orc_add_volume_f32 ((gpointer) (outmap->data + - out_start * bpf), (gpointer) (inmap.data + pad->position), - pad->volume, overlap * audiomixer->info.channels); + audiomixer_orc_add_volume_f32 ((gpointer) (outmap.data + + out_offset * bpf), (gpointer) (inmap.data + in_offset * bpf), + pad->volume, num_frames * aagg->info.channels); break; case GST_AUDIO_FORMAT_F64: - audiomixer_orc_add_volume_f64 ((gpointer) (outmap->data + - out_start * bpf), (gpointer) (inmap.data + pad->position), - pad->volume, overlap * audiomixer->info.channels); + audiomixer_orc_add_volume_f64 ((gpointer) (outmap.data + + out_offset * bpf), (gpointer) (inmap.data + in_offset * bpf), + pad->volume, num_frames * aagg->info.channels); break; default: g_assert_not_reached (); @@ -1295,321 +707,12 @@ gst_audio_mixer_mix_buffer (GstAudioMixer * audiomixer, GstAudioMixerPad * pad, } } gst_buffer_unmap (inbuf, &inmap); - gst_buffer_unref (inbuf); - - pad->position += overlap * bpf; - pad->output_offset += overlap; - - if (pad->position == pad->size) { - /* Buffer done, drop it */ - gst_buffer_replace (&pad->buffer, NULL); - GST_DEBUG_OBJECT (pad, "Finished mixing buffer, waiting for next"); - drop_buf = TRUE; - } - - GST_OBJECT_UNLOCK (pad); - - if (drop_buf) - gst_aggregator_pad_drop_buffer (aggpad); -} - -static GstFlowReturn -gst_audiomixer_aggregate (GstAggregator * agg, gboolean timeout) -{ - /* Get all pads that have data for us and store them in a - * new list. - * - * Calculate the current output offset/timestamp and - * offset_end/timestamp_end. Allocate a silence buffer - * for this and store it. - * - * For all pads: - * 1) Once per input buffer (cached) - * 1) Check discont (flag and timestamp with tolerance) - * 2) If discont or new, resync. That means: - * 1) Drop all start data of the buffer that comes before - * the current position/offset. - * 2) Calculate the offset (output segment!) that the first - * frame of the input buffer corresponds to. Base this on - * the running time. - * - * 2) If the current pad's offset/offset_end overlaps with the output - * offset/offset_end, mix it at the appropiate position in the output - * buffer and advance the pad's position. Remember if this pad needs - * a new buffer to advance behind the output offset_end. - * - * 3) If we had no pad with a buffer, go EOS. - * - * 4) If we had at least one pad that did not advance behind output - * offset_end, let collected be called again for the current - * output offset/offset_end. - */ - GstAudioMixer *audiomixer; - GList *iter; - GstFlowReturn ret; - GstBuffer *outbuf = NULL; - GstMapInfo outmap; - gint64 next_offset; - gint64 next_timestamp; - gint rate, bpf; - gboolean dropped = FALSE; - gboolean is_eos = TRUE; - gboolean is_done = TRUE; - guint blocksize; - - audiomixer = GST_AUDIO_MIXER (agg); - - /* Update position from the segment start/stop if needed */ - if (agg->segment.position == -1) { - if (agg->segment.rate > 0.0) - agg->segment.position = agg->segment.start; - else - agg->segment.position = agg->segment.stop; - } - - if (G_UNLIKELY (audiomixer->info.finfo->format == GST_AUDIO_FORMAT_UNKNOWN)) { - if (timeout) { - GST_DEBUG_OBJECT (audiomixer, - "Got timeout before receiving any caps, don't output anything"); - - /* Advance position */ - if (agg->segment.rate > 0.0) - agg->segment.position += audiomixer->output_buffer_duration; - else if (agg->segment.position > audiomixer->output_buffer_duration) - agg->segment.position -= audiomixer->output_buffer_duration; - else - agg->segment.position = 0; - - return GST_FLOW_OK; - } else { - goto not_negotiated; - } - } - - blocksize = - gst_util_uint64_scale (audiomixer->output_buffer_duration, - GST_AUDIO_INFO_RATE (&audiomixer->info), GST_SECOND); - blocksize = MAX (1, blocksize); - - if (audiomixer->send_caps) { - gst_aggregator_set_src_caps (agg, audiomixer->current_caps); - - audiomixer->offset = gst_util_uint64_scale (agg->segment.position, - GST_AUDIO_INFO_RATE (&audiomixer->info), GST_SECOND); - - audiomixer->send_caps = FALSE; - } - - rate = GST_AUDIO_INFO_RATE (&audiomixer->info); - bpf = GST_AUDIO_INFO_BPF (&audiomixer->info); - - /* for the next timestamp, use the sample counter, which will - * never accumulate rounding errors */ - - /* FIXME: Reverse mixing does not work at all yet */ - if (agg->segment.rate > 0.0) { - next_offset = audiomixer->offset + blocksize; - } else { - next_offset = audiomixer->offset - blocksize; - } - next_timestamp = gst_util_uint64_scale (next_offset, GST_SECOND, rate); - - if (audiomixer->current_buffer) { - outbuf = audiomixer->current_buffer; - } else { - outbuf = gst_buffer_new_and_alloc (blocksize * bpf); - gst_buffer_map (outbuf, &outmap, GST_MAP_WRITE); - gst_audio_format_fill_silence (audiomixer->info.finfo, outmap.data, - outmap.size); - gst_buffer_unmap (outbuf, &outmap); - audiomixer->current_buffer = outbuf; - } - - GST_LOG_OBJECT (agg, - "Starting to mix %u samples for offset %" G_GUINT64_FORMAT - " with timestamp %" GST_TIME_FORMAT, blocksize, - audiomixer->offset, GST_TIME_ARGS (agg->segment.position)); - - gst_buffer_map (outbuf, &outmap, GST_MAP_READWRITE); - - GST_OBJECT_LOCK (agg); - for (iter = GST_ELEMENT (agg)->sinkpads; iter; iter = iter->next) { - GstBuffer *inbuf; - GstAudioMixerPad *pad = GST_AUDIO_MIXER_PAD (iter->data); - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (iter->data); - - if (!gst_aggregator_pad_is_eos (aggpad)) - is_eos = FALSE; - - inbuf = gst_aggregator_pad_get_buffer (aggpad); - if (!inbuf) { - if (timeout) { - if (pad->output_offset < next_offset) { - gint64 diff = next_offset - pad->output_offset; - - GST_LOG_OBJECT (pad, "Timeout, missing %" G_GINT64_FORMAT " frames (%" - GST_TIME_FORMAT ")", diff, - GST_TIME_ARGS (gst_util_uint64_scale (diff, GST_SECOND, rate))); - } - } else if (!gst_aggregator_pad_is_eos (aggpad)) { - is_done = FALSE; - } - continue; - } - - g_assert (!pad->buffer || pad->buffer == inbuf); - - /* New buffer? */ - if (!pad->buffer) { - /* Takes ownership of buffer */ - if (!gst_audio_mixer_fill_buffer (audiomixer, pad, inbuf)) { - dropped = TRUE; - continue; - } - } else { - gst_buffer_unref (inbuf); - } - - if (!pad->buffer && !dropped && gst_aggregator_pad_is_eos (aggpad)) { - GST_DEBUG_OBJECT (aggpad, "Pad is in EOS state"); - continue; - } - - g_assert (pad->buffer); - - /* This pad is lacking behind, we need to update the offset - * and maybe drop the current buffer */ - if (pad->output_offset < audiomixer->offset) { - gint64 diff = audiomixer->offset - pad->output_offset; - gint bpf = GST_AUDIO_INFO_BPF (&audiomixer->info); - - if (pad->position + (diff * bpf) > pad->size) - diff = (pad->size - pad->position) / bpf; - pad->position += diff * bpf; - pad->output_offset += diff; - - if (pad->position == pad->size) { - /* Buffer done, drop it */ - gst_buffer_replace (&pad->buffer, NULL); - gst_aggregator_pad_drop_buffer (aggpad); - dropped = TRUE; - continue; - } - } - - if (pad->output_offset >= audiomixer->offset - && pad->output_offset < audiomixer->offset + blocksize && pad->buffer) { - GST_LOG_OBJECT (aggpad, "Mixing buffer for current offset"); - gst_audio_mixer_mix_buffer (audiomixer, pad, &outmap); - - if (pad->output_offset >= next_offset) { - GST_DEBUG_OBJECT (pad, - "Pad is after current offset: %" G_GUINT64_FORMAT " >= %" - G_GUINT64_FORMAT, pad->output_offset, next_offset); - } else { - is_done = FALSE; - } - } - } - GST_OBJECT_UNLOCK (agg); - gst_buffer_unmap (outbuf, &outmap); - if (dropped) { - /* We dropped a buffer, retry */ - GST_INFO_OBJECT (audiomixer, - "A pad dropped a buffer, wait for the next one"); - return GST_FLOW_OK; - } - - if (!is_done && !is_eos) { - /* Get more buffers */ - GST_INFO_OBJECT (audiomixer, - "We're not done yet for the current offset, waiting for more data"); - return GST_FLOW_OK; - } - - if (is_eos) { - gint64 max_offset = 0; - gboolean empty_buffer = TRUE; - - GST_DEBUG_OBJECT (audiomixer, "We're EOS"); - - GST_OBJECT_LOCK (agg); - for (iter = GST_ELEMENT (agg)->sinkpads; iter; iter = iter->next) { - GstAudioMixerPad *pad = GST_AUDIO_MIXER_PAD (iter->data); - - max_offset = MAX ((gint64) max_offset, (gint64) pad->output_offset); - if (pad->output_offset > audiomixer->offset) - empty_buffer = FALSE; - } - GST_OBJECT_UNLOCK (agg); - - /* This means EOS or no pads at all */ - if (empty_buffer) { - gst_buffer_replace (&audiomixer->current_buffer, NULL); - goto eos; - } - - if (max_offset <= next_offset) { - GST_DEBUG_OBJECT (audiomixer, - "Last buffer is incomplete: %" G_GUINT64_FORMAT " <= %" - G_GUINT64_FORMAT, max_offset, next_offset); - next_offset = max_offset; - if (next_offset > audiomixer->offset) - gst_buffer_resize (outbuf, 0, (next_offset - audiomixer->offset) * bpf); - - next_timestamp = gst_util_uint64_scale (next_offset, GST_SECOND, rate); - } - } - - /* set timestamps on the output buffer */ - if (agg->segment.rate > 0.0) { - GST_BUFFER_TIMESTAMP (outbuf) = agg->segment.position; - GST_BUFFER_OFFSET (outbuf) = audiomixer->offset; - GST_BUFFER_OFFSET_END (outbuf) = next_offset; - GST_BUFFER_DURATION (outbuf) = next_timestamp - agg->segment.position; - } else { - GST_BUFFER_TIMESTAMP (outbuf) = next_timestamp; - GST_BUFFER_OFFSET (outbuf) = next_offset; - GST_BUFFER_OFFSET_END (outbuf) = audiomixer->offset; - GST_BUFFER_DURATION (outbuf) = agg->segment.position - next_timestamp; - } - - audiomixer->offset = next_offset; - agg->segment.position = next_timestamp; - - /* send it out */ - GST_LOG_OBJECT (audiomixer, - "pushing outbuf %p, timestamp %" GST_TIME_FORMAT " offset %" - G_GINT64_FORMAT, outbuf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)), - GST_BUFFER_OFFSET (outbuf)); - - ret = gst_aggregator_finish_buffer (agg, audiomixer->current_buffer); - audiomixer->current_buffer = NULL; - - GST_LOG_OBJECT (audiomixer, "pushed outbuf, result = %s", - gst_flow_get_name (ret)); - - if (ret == GST_FLOW_OK && is_eos) - goto eos; - - return ret; - /* ERRORS */ -not_negotiated: - { - GST_ELEMENT_ERROR (audiomixer, STREAM, FORMAT, (NULL), - ("Unknown data received, not negotiated")); - return GST_FLOW_NOT_NEGOTIATED; - } - -eos: - { - GST_DEBUG_OBJECT (audiomixer, "EOS"); - return GST_FLOW_EOS; - } + return TRUE; } + /* GstChildProxy implementation */ static GObject * gst_audiomixer_child_proxy_get_child_by_index (GstChildProxy * child_proxy, diff --git a/gst/audiomixer/gstaudiomixer.h b/gst/audiomixer/gstaudiomixer.h index 9507e6c5d8..add6e32f47 100644 --- a/gst/audiomixer/gstaudiomixer.h +++ b/gst/audiomixer/gstaudiomixer.h @@ -25,8 +25,8 @@ #define __GST_AUDIO_MIXER_H__ #include -#include #include +#include "gstaudioaggregator.h" G_BEGIN_DECLS @@ -49,32 +49,14 @@ typedef struct _GstAudioMixerPadClass GstAudioMixerPadClass; * The audiomixer object structure. */ struct _GstAudioMixer { - GstAggregator aggregator; - - /* the next are valid for both int and float */ - GstAudioInfo info; - - /* counters to keep track of timestamps */ - gint64 offset; - /* Buffer starting at offset containing block_size samples */ - GstBuffer *current_buffer; - - /* current caps */ - GstCaps *current_caps; - gboolean send_caps; + GstAudioAggregator element; /* target caps (set via property) */ GstCaps *filter_caps; - - GstClockTime alignment_threshold; - GstClockTime discont_wait; - - /* Duration of every output buffer */ - GstClockTime output_buffer_duration; }; struct _GstAudioMixerClass { - GstAggregatorClass parent_class; + GstAudioAggregatorClass parent_class; }; GType gst_audiomixer_get_type (void); @@ -87,36 +69,17 @@ GType gst_audiomixer_get_type (void); #define GST_AUDIO_MIXER_PAD_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj) ,GST_TYPE_AUDIO_MIXER_PAD,GstAudioMixerPadClass)) struct _GstAudioMixerPad { - GstAggregatorPad parent; + GstAudioAggregatorPad parent; gdouble volume; gint volume_i32; gint volume_i16; gint volume_i8; gboolean mute; - - /* < private > */ - GstBuffer *buffer; /* current buffer we're mixing, - for comparison with collect.buffer - to see if we need to update our - cached values. */ - guint position, size; - - guint64 output_offset; /* Offset in output segment that - collect.pos refers to in the - current buffer. */ - - guint64 next_offset; /* Next expected offset in the input segment */ - - /* Last time we noticed a discont */ - GstClockTime discont_time; - - /* A new unhandled segment event has been received */ - gboolean new_segment; }; struct _GstAudioMixerPadClass { - GstAggregatorPadClass parent_class; + GstAudioAggregatorPadClass parent_class; }; GType gst_audiomixer_pad_get_type (void);