Make ringbuffer faster and more simple by removing the locks in the playback thread.

Original commit message from CVS:
Make ringbuffer faster and more simple by removing the locks
in the playback thread.
Add sample accurate playback based on buffer sample offsets.
Make the baseaudiosink provide a clock.
Parse caps in the base class.
Correctly handle seeking, flushing and state changes.
This commit is contained in:
Wim Taymans 2005-04-28 16:15:42 +00:00
parent 37822dc3fb
commit 235ea5989c
11 changed files with 863 additions and 511 deletions

View file

@ -1,3 +1,47 @@
2005-04-28 Wim Taymans <wim@fluendo.com>
* gst-libs/gst/audio/Makefile.am:
* gst-libs/gst/audio/audio.h:
* gst-libs/gst/audio/audioclock.c:
* gst-libs/gst/audio/audioclock.h:
* gst-libs/gst/audio/gstaudioclock.c: (gst_audio_clock_get_type),
(gst_audio_clock_class_init), (gst_audio_clock_init),
(gst_audio_clock_new), (gst_audio_clock_get_internal_time):
* gst-libs/gst/audio/gstaudioclock.h:
* gst-libs/gst/audio/gstaudiosink.c:
(gst_audioringbuffer_get_type), (gst_audioringbuffer_class_init),
(audioringbuffer_thread_func), (gst_audioringbuffer_init),
(gst_audioringbuffer_acquire), (gst_audioringbuffer_release),
(gst_audioringbuffer_play), (gst_audioringbuffer_stop),
(gst_audioringbuffer_delay), (gst_audiosink_class_init),
(gst_audiosink_create_ringbuffer):
* gst-libs/gst/audio/gstbaseaudiosink.c:
(gst_baseaudiosink_class_init), (gst_baseaudiosink_init),
(gst_baseaudiosink_get_clock), (gst_baseaudiosink_get_time),
(gst_baseaudiosink_set_property), (gst_baseaudiosink_get_property),
(build_linear_format), (debug_spec_caps), (debug_spec_buffer),
(gst_baseaudiosink_setcaps), (gst_baseaudiosink_get_times),
(gst_baseaudiosink_event), (gst_baseaudiosink_preroll),
(gst_baseaudiosink_render), (gst_baseaudiosink_create_ringbuffer),
(gst_baseaudiosink_callback), (gst_baseaudiosink_change_state):
* gst-libs/gst/audio/gstbaseaudiosink.h:
* gst-libs/gst/audio/gstringbuffer.c: (gst_ringbuffer_get_type),
(gst_ringbuffer_init), (gst_ringbuffer_finalize),
(gst_ringbuffer_set_callback), (gst_ringbuffer_acquire),
(gst_ringbuffer_release), (gst_ringbuffer_play),
(gst_ringbuffer_pause), (gst_ringbuffer_stop),
(gst_ringbuffer_delay), (gst_ringbuffer_played_samples),
(gst_ringbuffer_set_sample), (wait_segment),
(gst_ringbuffer_commit), (gst_ringbuffer_prepare_read),
(gst_ringbuffer_advance), (gst_ringbuffer_clear):
* gst-libs/gst/audio/gstringbuffer.h:
Make ringbuffer faster and more simple by removing the locks
in the playback thread.
Add sample accurate playback based on buffer sample offsets.
Make the baseaudiosink provide a clock.
Parse caps in the base class.
Correctly handle seeking, flushing and state changes.
2005-04-25 Thomas Vander Stichele <thomas at apestaart dot org>
* configure.ac:

View file

@ -14,7 +14,7 @@ EXTRA_DIST = gstaudiofiltertemplate.c make_filter
CLEANFILES = gstaudiofilterexample.c \
$(BUILT_SOURCES)
libgstaudio_@GST_MAJORMINOR@_la_SOURCES = audio.c audioclock.c \
libgstaudio_@GST_MAJORMINOR@_la_SOURCES = audio.c gstaudioclock.c \
multichannel.c \
gstaudiosink.c \
gstbaseaudiosink.c \
@ -24,7 +24,7 @@ nodist_libgstaudio_@GST_MAJORMINOR@_la_SOURCES = $(built_sources) $(built_header
libgstaudio_@GST_MAJORMINOR@includedir = $(includedir)/gstreamer-@GST_MAJORMINOR@/gst/audio
libgstaudio_@GST_MAJORMINOR@include_HEADERS = \
audio.h \
audioclock.h \
gstaudioclock.h \
gstaudiofilter.h \
gstaudiosink.h \
gstbaseaudiosink.h \

View file

@ -20,8 +20,6 @@
#include <gst/gst.h>
#include <gst/audio/audioclock.h>
#ifndef __GST_AUDIO_AUDIO_H__
#define __GST_AUDIO_AUDIO_H__

View file

@ -1,205 +0,0 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
*
* audioclock.c: Clock for use by audio plugins
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "audioclock.h"
static void gst_audio_clock_class_init (GstAudioClockClass * klass);
static void gst_audio_clock_init (GstAudioClock * clock);
static GstClockTime gst_audio_clock_get_internal_time (GstClock * clock);
static GstClockReturn gst_audio_clock_id_wait_async (GstClock * clock,
GstClockEntry * entry);
static void gst_audio_clock_id_unschedule (GstClock * clock,
GstClockEntry * entry);
static GstSystemClockClass *parent_class = NULL;
/* static guint gst_audio_clock_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_audio_clock_get_type (void)
{
static GType clock_type = 0;
if (!clock_type) {
static const GTypeInfo clock_info = {
sizeof (GstAudioClockClass),
NULL,
NULL,
(GClassInitFunc) gst_audio_clock_class_init,
NULL,
NULL,
sizeof (GstAudioClock),
4,
(GInstanceInitFunc) gst_audio_clock_init,
NULL
};
clock_type = g_type_register_static (GST_TYPE_SYSTEM_CLOCK, "GstAudioClock",
&clock_info, 0);
}
return clock_type;
}
static void
gst_audio_clock_class_init (GstAudioClockClass * klass)
{
GObjectClass *gobject_class;
GstObjectClass *gstobject_class;
GstClockClass *gstclock_class;
gobject_class = (GObjectClass *) klass;
gstobject_class = (GstObjectClass *) klass;
gstclock_class = (GstClockClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_SYSTEM_CLOCK);
gstclock_class->get_internal_time = gst_audio_clock_get_internal_time;
gstclock_class->wait_async = gst_audio_clock_id_wait_async;
gstclock_class->unschedule = gst_audio_clock_id_unschedule;
}
static void
gst_audio_clock_init (GstAudioClock * clock)
{
gst_object_set_name (GST_OBJECT (clock), "GstAudioClock");
clock->prev1 = 0;
clock->prev2 = 0;
}
GstClock *
gst_audio_clock_new (gchar * name, GstAudioClockGetTimeFunc func,
gpointer user_data)
{
GstAudioClock *aclock =
GST_AUDIO_CLOCK (g_object_new (GST_TYPE_AUDIO_CLOCK, NULL));
aclock->func = func;
aclock->user_data = user_data;
aclock->adjust = 0;
return (GstClock *) aclock;
}
void
gst_audio_clock_set_active (GstAudioClock * aclock, gboolean active)
{
GstClockTime audio_time, system_time;
GstClock *clock;
GTimeVal timeval;
g_return_if_fail (GST_IS_AUDIO_CLOCK (aclock));
clock = GST_CLOCK (aclock);
if (active == aclock->active) {
/* Nothing to do. */
return;
}
audio_time = aclock->func (clock, aclock->user_data);
g_get_current_time (&timeval);
system_time = GST_TIMEVAL_TO_TIME (timeval);
/* Set the new adjust value in such a way that there's no abrupt
discontinuity, i.e. if gst_audio_clock_get_internal_time is
invoked right before and right after (de)activating the clock,
the values returned will be close to each other, and the second
value will be greater than or equal than the first. */
if (active) {
aclock->adjust = aclock->adjust + system_time - audio_time;
} else {
aclock->adjust = aclock->adjust + audio_time - system_time;
}
aclock->active = active;
}
static GstClockTime
gst_audio_clock_get_internal_time (GstClock * clock)
{
GstAudioClock *aclock = GST_AUDIO_CLOCK (clock);
if (aclock->active) {
return aclock->func (clock, aclock->user_data) + aclock->adjust;
} else {
GTimeVal timeval;
g_get_current_time (&timeval);
return GST_TIMEVAL_TO_TIME (timeval) + aclock->adjust;
}
}
void
gst_audio_clock_update_time (GstAudioClock * aclock, GstClockTime time)
{
/* I don't know of a purpose in updating these; perhaps they can be removed */
aclock->prev2 = aclock->prev1;
aclock->prev1 = time;
/* FIXME: the wait_async subsystem should be made threadsafe, but I don't want
* to lock and unlock a mutex on every iteration... */
while (aclock->async_entries) {
GstClockEntry *entry = (GstClockEntry *) aclock->async_entries->data;
if (entry->time > time)
break;
entry->func ((GstClock *) aclock, time, entry, entry->user_data);
aclock->async_entries = g_slist_delete_link (aclock->async_entries,
aclock->async_entries);
/* do I need to free the entry? */
}
}
static gint
compare_clock_entries (GstClockEntry * entry1, GstClockEntry * entry2)
{
return entry1->time - entry2->time;
}
static GstClockReturn
gst_audio_clock_id_wait_async (GstClock * clock, GstClockEntry * entry)
{
GstAudioClock *aclock = (GstAudioClock *) clock;
aclock->async_entries = g_slist_insert_sorted (aclock->async_entries,
entry, (GCompareFunc) compare_clock_entries);
/* is this the proper return val? */
return GST_CLOCK_EARLY;
}
static void
gst_audio_clock_id_unschedule (GstClock * clock, GstClockEntry * entry)
{
GstAudioClock *aclock = (GstAudioClock *) clock;
aclock->async_entries = g_slist_remove (aclock->async_entries, entry);
}

View file

@ -0,0 +1,105 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
*
* audioclock.c: Clock for use by audio plugins
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstaudioclock.h"
static void gst_audio_clock_class_init (GstAudioClockClass * klass);
static void gst_audio_clock_init (GstAudioClock * clock);
static GstClockTime gst_audio_clock_get_internal_time (GstClock * clock);
static GstSystemClockClass *parent_class = NULL;
/* static guint gst_audio_clock_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_audio_clock_get_type (void)
{
static GType clock_type = 0;
if (!clock_type) {
static const GTypeInfo clock_info = {
sizeof (GstAudioClockClass),
NULL,
NULL,
(GClassInitFunc) gst_audio_clock_class_init,
NULL,
NULL,
sizeof (GstAudioClock),
4,
(GInstanceInitFunc) gst_audio_clock_init,
NULL
};
clock_type = g_type_register_static (GST_TYPE_SYSTEM_CLOCK, "GstAudioClock",
&clock_info, 0);
}
return clock_type;
}
static void
gst_audio_clock_class_init (GstAudioClockClass * klass)
{
GObjectClass *gobject_class;
GstObjectClass *gstobject_class;
GstClockClass *gstclock_class;
gobject_class = (GObjectClass *) klass;
gstobject_class = (GstObjectClass *) klass;
gstclock_class = (GstClockClass *) klass;
parent_class = g_type_class_ref (GST_TYPE_SYSTEM_CLOCK);
gstclock_class->get_internal_time = gst_audio_clock_get_internal_time;
}
static void
gst_audio_clock_init (GstAudioClock * clock)
{
gst_object_set_name (GST_OBJECT (clock), "GstAudioClock");
}
GstClock *
gst_audio_clock_new (gchar * name, GstAudioClockGetTimeFunc func,
gpointer user_data)
{
GstAudioClock *aclock =
GST_AUDIO_CLOCK (g_object_new (GST_TYPE_AUDIO_CLOCK, NULL));
aclock->func = func;
aclock->user_data = user_data;
return (GstClock *) aclock;
}
static GstClockTime
gst_audio_clock_get_internal_time (GstClock * clock)
{
GstAudioClock *aclock = GST_AUDIO_CLOCK (clock);
return aclock->func (clock, aclock->user_data);
}

View file

@ -1,8 +1,8 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
* 2005 Wim Taymans <wim@fluendo.com>
*
* audioclock.h: Clock for use by audio plugins
* gstaudioclock.h: Clock for use by audio plugins
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
@ -44,22 +44,13 @@ typedef struct _GstAudioClockClass GstAudioClockClass;
typedef GstClockTime (*GstAudioClockGetTimeFunc) (GstClock *clock, gpointer user_data);
struct _GstAudioClock {
GstSystemClock clock;
GstClockTime prev1, prev2;
/* --- protected --- */
GstAudioClockGetTimeFunc func;
gpointer user_data;
GstClockTimeDiff adjust;
GSList *async_entries;
gboolean active;
gpointer _gst_reserved[GST_PADDING];
};
@ -72,9 +63,6 @@ struct _GstAudioClockClass {
GType gst_audio_clock_get_type (void);
GstClock* gst_audio_clock_new (gchar *name, GstAudioClockGetTimeFunc func,
gpointer user_data);
void gst_audio_clock_set_active (GstAudioClock *aclock, gboolean active);
void gst_audio_clock_update_time (GstAudioClock *aclock, GstClockTime time);
G_END_DECLS

View file

@ -125,6 +125,7 @@ gst_audioringbuffer_class_init (GstAudioRingBufferClass * klass)
gstringbuffer_class->release =
GST_DEBUG_FUNCPTR (gst_audioringbuffer_release);
gstringbuffer_class->play = GST_DEBUG_FUNCPTR (gst_audioringbuffer_play);
gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_audioringbuffer_play);
gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_audioringbuffer_stop);
gstringbuffer_class->delay = GST_DEBUG_FUNCPTR (gst_audioringbuffer_delay);
@ -144,7 +145,6 @@ audioringbuffer_thread_func (GstRingBuffer * buf)
GstAudioSinkClass *csink;
GstAudioRingBuffer *abuf = GST_AUDIORINGBUFFER (buf);
WriteFunc writefunc;
gint segsize, segtotal;
sink = GST_AUDIOSINK (GST_OBJECT_PARENT (buf));
csink = GST_AUDIOSINK_GET_CLASS (sink);
@ -155,53 +155,48 @@ audioringbuffer_thread_func (GstRingBuffer * buf)
if (writefunc == NULL)
goto no_function;
segsize = buf->spec.segsize;
segtotal = buf->spec.segtotal;
while (TRUE) {
if (g_atomic_int_get (&buf->state) == GST_RINGBUFFER_STATE_PLAYING) {
gint to_write, written;
guint8 *readptr;
gint readseg;
gint left, len;
guint8 *readptr;
gint readseg;
/* we write one segment */
to_write = segsize;
written = 0;
/* need to read and write the next segment */
readseg = (buf->playseg + 1) % segtotal;
/* get a pointer in the buffer to this segment */
readptr = gst_ringbuffer_prepare_read (buf, readseg);
if (gst_ringbuffer_prepare_read (buf, &readseg, &readptr, &len)) {
gint written = 0;
left = len;
do {
written = writefunc (sink, readptr + written, to_write);
if (written < 0 || written > to_write) {
perror ("error writing data\n");
GST_DEBUG ("transfer %d bytes from segment %d", left, readseg);
written = writefunc (sink, readptr + written, left);
GST_DEBUG ("transfered %d bytes", written);
if (written < 0 || written > left) {
GST_WARNING ("error writing data (reason: %s), skipping segment\n",
strerror (errno));
break;
}
to_write -= written;
} while (to_write > 0);
left -= written;
} while (left > 0);
/* clear written samples */
gst_ringbuffer_clear (buf, readseg);
/* we wrote one segment */
gst_ringbuffer_callback (buf, 1);
gst_ringbuffer_advance (buf, 1);
} else {
GST_LOCK (abuf);
GST_DEBUG ("signal wait");
GST_AUDIORINGBUFFER_SIGNAL (buf);
GST_DEBUG ("wait for play");
GST_DEBUG ("wait for action");
GST_AUDIORINGBUFFER_WAIT (buf);
GST_DEBUG ("got signal");
if (!abuf->running) {
GST_UNLOCK (abuf);
GST_DEBUG ("stop running");
goto done;
break;
}
GST_DEBUG ("continue running");
GST_UNLOCK (abuf);
}
}
done:
GST_DEBUG ("exit thread");
return;
@ -305,7 +300,7 @@ gst_audioringbuffer_play (GstRingBuffer * buf)
sink = GST_AUDIOSINK (GST_OBJECT_PARENT (buf));
GST_DEBUG ("play");
GST_DEBUG ("play, sending signal");
GST_AUDIORINGBUFFER_SIGNAL (buf);
return TRUE;
@ -321,11 +316,15 @@ gst_audioringbuffer_stop (GstRingBuffer * buf)
csink = GST_AUDIOSINK_GET_CLASS (sink);
/* unblock any pending writes to the audio device */
if (csink->reset)
if (csink->reset) {
GST_DEBUG ("reset...");
csink->reset (sink);
GST_DEBUG ("reset done");
}
GST_DEBUG ("stop");
GST_DEBUG ("stop, waiting...");
GST_AUDIORINGBUFFER_WAIT (buf);
GST_DEBUG ("stoped");
return TRUE;
}
@ -398,7 +397,9 @@ gst_audiosink_create_ringbuffer (GstBaseAudioSink * sink)
{
GstRingBuffer *buffer;
GST_DEBUG ("creating ringbuffer");
buffer = g_object_new (GST_TYPE_AUDIORINGBUFFER, NULL);
GST_DEBUG ("created ringbuffer @%p", buffer);
return buffer;
}

View file

@ -20,6 +20,8 @@
* Boston, MA 02111-1307, USA.
*/
#include <string.h>
#include "gstbaseaudiosink.h"
GST_DEBUG_CATEGORY_STATIC (gst_baseaudiosink_debug);
@ -32,13 +34,13 @@ enum
LAST_SIGNAL
};
#define DEFAULT_BUFFER -1
#define DEFAULT_LATENCY -1
#define DEFAULT_BUFFER_TIME 500 * GST_USECOND
#define DEFAULT_LATENCY_TIME 10 * GST_USECOND
enum
{
PROP_0,
PROP_BUFFER,
PROP_LATENCY,
PROP_BUFFER_TIME,
PROP_LATENCY_TIME,
};
#define _do_init(bla) \
@ -55,6 +57,10 @@ static void gst_baseaudiosink_get_property (GObject * object, guint prop_id,
static GstElementStateReturn gst_baseaudiosink_change_state (GstElement *
element);
static GstClock *gst_baseaudiosink_get_clock (GstElement * elem);
static GstClockTime gst_baseaudiosink_get_time (GstClock * clock,
GstBaseAudioSink * sink);
static GstFlowReturn gst_baseaudiosink_preroll (GstBaseSink * bsink,
GstBuffer * buffer);
static GstFlowReturn gst_baseaudiosink_render (GstBaseSink * bsink,
@ -87,17 +93,18 @@ gst_baseaudiosink_class_init (GstBaseAudioSinkClass * klass)
gobject_class->get_property =
GST_DEBUG_FUNCPTR (gst_baseaudiosink_get_property);
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER,
g_param_spec_uint64 ("buffer", "Buffer",
"Size of audio buffer in nanoseconds (-1 = default)",
0, G_MAXUINT64, DEFAULT_BUFFER, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_LATENCY,
g_param_spec_uint64 ("latency", "Latency",
"Audio latency in nanoseconds (-1 = default)",
0, G_MAXUINT64, DEFAULT_LATENCY, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_TIME,
g_param_spec_int64 ("buffer-time", "Buffer Time",
"Size of audio buffer in milliseconds (-1 = default)",
-1, G_MAXINT64, DEFAULT_BUFFER_TIME, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_LATENCY_TIME,
g_param_spec_int64 ("latency-time", "Latency Time",
"Audio latency in milliseconds (-1 = default)",
-1, G_MAXINT64, DEFAULT_LATENCY_TIME, G_PARAM_READWRITE));
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_baseaudiosink_change_state);
gstelement_class->get_clock = GST_DEBUG_FUNCPTR (gst_baseaudiosink_get_clock);
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_baseaudiosink_event);
gstbasesink_class->preroll = GST_DEBUG_FUNCPTR (gst_baseaudiosink_preroll);
@ -110,8 +117,38 @@ gst_baseaudiosink_class_init (GstBaseAudioSinkClass * klass)
static void
gst_baseaudiosink_init (GstBaseAudioSink * baseaudiosink)
{
baseaudiosink->buffer = DEFAULT_BUFFER;
baseaudiosink->latency = DEFAULT_LATENCY;
baseaudiosink->buffer_time = DEFAULT_BUFFER_TIME;
baseaudiosink->latency_time = DEFAULT_LATENCY_TIME;
baseaudiosink->clock = gst_audio_clock_new ("clock",
(GstAudioClockGetTimeFunc) gst_baseaudiosink_get_time, baseaudiosink);
}
static GstClock *
gst_baseaudiosink_get_clock (GstElement * elem)
{
GstBaseAudioSink *sink;
sink = GST_BASEAUDIOSINK (elem);
return GST_CLOCK (gst_object_ref (GST_OBJECT (sink->clock)));
}
static GstClockTime
gst_baseaudiosink_get_time (GstClock * clock, GstBaseAudioSink * sink)
{
guint64 samples;
GstClockTime result;
if (sink->ringbuffer == NULL || sink->ringbuffer->spec.rate == 0)
return 0;
samples = gst_ringbuffer_played_samples (sink->ringbuffer);
result = samples * GST_SECOND / sink->ringbuffer->spec.rate;
result += GST_ELEMENT (sink)->base_time;
return result;
}
static void
@ -123,9 +160,11 @@ gst_baseaudiosink_set_property (GObject * object, guint prop_id,
sink = GST_BASEAUDIOSINK (object);
switch (prop_id) {
case PROP_BUFFER:
case PROP_BUFFER_TIME:
sink->buffer_time = g_value_get_int64 (value);
break;
case PROP_LATENCY:
case PROP_LATENCY_TIME:
sink->latency_time = g_value_get_int64 (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -142,9 +181,11 @@ gst_baseaudiosink_get_property (GObject * object, guint prop_id, GValue * value,
sink = GST_BASEAUDIOSINK (object);
switch (prop_id) {
case PROP_BUFFER:
case PROP_BUFFER_TIME:
g_value_set_int64 (value, sink->buffer_time);
break;
case PROP_LATENCY:
case PROP_LATENCY_TIME:
g_value_set_int64 (value, sink->latency_time);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -152,25 +193,228 @@ gst_baseaudiosink_get_property (GObject * object, guint prop_id, GValue * value,
}
}
static int linear_formats[4 * 2 * 2] = {
GST_S8,
GST_S8,
GST_U8,
GST_U8,
GST_S16_LE,
GST_S16_BE,
GST_U16_LE,
GST_U16_BE,
GST_S24_LE,
GST_S24_BE,
GST_U24_LE,
GST_U24_BE,
GST_S32_LE,
GST_S32_BE,
GST_U32_LE,
GST_U32_BE
};
static int linear24_formats[3 * 2 * 2] = {
GST_S24_3LE,
GST_S24_3BE,
GST_U24_3LE,
GST_U24_3BE,
GST_S20_3LE,
GST_S20_3BE,
GST_U20_3LE,
GST_U20_3BE,
GST_S18_3LE,
GST_S18_3BE,
GST_U18_3LE,
GST_U18_3BE,
};
static GstBufferFormat
build_linear_format (int depth, int width, int unsignd, int big_endian)
{
if (width == 24) {
switch (depth) {
case 24:
depth = 0;
break;
case 20:
depth = 1;
break;
case 18:
depth = 2;
break;
default:
return GST_UNKNOWN;
}
return ((int (*)[2][2]) linear24_formats)[depth][!!unsignd][!!big_endian];
} else {
switch (depth) {
case 8:
depth = 0;
break;
case 16:
depth = 1;
break;
case 24:
depth = 2;
break;
case 32:
depth = 3;
break;
default:
return GST_UNKNOWN;
}
}
return ((int (*)[2][2]) linear_formats)[depth][!!unsignd][!!big_endian];
}
static void
debug_spec_caps (GstBaseAudioSink * sink, GstRingBufferSpec * spec)
{
GST_DEBUG ("spec caps: %p %" GST_PTR_FORMAT, spec->caps, spec->caps);
GST_DEBUG ("parsed caps: type: %d", spec->type);
GST_DEBUG ("parsed caps: format: %d", spec->format);
GST_DEBUG ("parsed caps: width: %d", spec->width);
GST_DEBUG ("parsed caps: depth: %d", spec->depth);
GST_DEBUG ("parsed caps: sign: %d", spec->sign);
GST_DEBUG ("parsed caps: bigend: %d", spec->bigend);
GST_DEBUG ("parsed caps: rate: %d", spec->rate);
GST_DEBUG ("parsed caps: channels: %d", spec->channels);
GST_DEBUG ("parsed caps: sample bytes: %d", spec->bytes_per_sample);
}
static void
debug_spec_buffer (GstBaseAudioSink * sink, GstRingBufferSpec * spec)
{
GST_DEBUG ("acquire ringbuffer: buffer time: %" G_GINT64_FORMAT " usec",
spec->buffer_time);
GST_DEBUG ("acquire ringbuffer: latency time: %" G_GINT64_FORMAT " usec",
spec->latency_time);
GST_DEBUG ("acquire ringbuffer: total segments: %d", spec->segtotal);
GST_DEBUG ("acquire ringbuffer: segment size: %d bytes = %d samples",
spec->segsize, spec->segsize / spec->bytes_per_sample);
GST_DEBUG ("acquire ringbuffer: buffer size: %d bytes = %d samples",
spec->segsize * spec->segtotal,
spec->segsize * spec->segtotal / spec->bytes_per_sample);
}
static gboolean
gst_baseaudiosink_setcaps (GstBaseSink * bsink, GstCaps * caps)
{
GstBaseAudioSink *sink = GST_BASEAUDIOSINK (bsink);
GstRingBufferSpec *spec;
const gchar *mimetype;
GstStructure *structure;
spec = &sink->ringbuffer->spec;
gst_caps_replace (&spec->caps, caps);
spec->buffersize = sink->buffer;
spec->latency = sink->latency;
structure = gst_caps_get_structure (caps, 0);
spec->segtotal = 0x7fff;
spec->segsize = 0x2048;
/* we have to differentiate between int and float formats */
mimetype = gst_structure_get_name (structure);
if (!strncmp (mimetype, "audio/x-raw-int", 15)) {
gint endianness;
spec->type = GST_BUFTYPE_LINEAR;
/* extract the needed information from the cap */
if (!(gst_structure_get_int (structure, "width", &spec->width) &&
gst_structure_get_int (structure, "depth", &spec->depth) &&
gst_structure_get_boolean (structure, "signed", &spec->sign)))
goto parse_error;
/* extract endianness if needed */
if (spec->width > 8) {
if (!gst_structure_get_int (structure, "endianness", &endianness))
goto parse_error;
} else {
endianness = G_BYTE_ORDER;
}
spec->bigend = endianness == G_LITTLE_ENDIAN ? FALSE : TRUE;
spec->format =
build_linear_format (spec->depth, spec->width, spec->sign ? 0 : 1,
spec->bigend ? 1 : 0);
} else if (!strncmp (mimetype, "audio/x-raw-float", 17)) {
spec->type = GST_BUFTYPE_FLOAT;
/* get layout */
if (!gst_structure_get_int (structure, "width", &spec->width))
goto parse_error;
/* match layout to format wrt to endianness */
switch (spec->width) {
case 32:
spec->format =
G_BYTE_ORDER == G_LITTLE_ENDIAN ? GST_FLOAT32_LE : GST_FLOAT32_BE;
break;
case 64:
spec->format =
G_BYTE_ORDER == G_LITTLE_ENDIAN ? GST_FLOAT64_LE : GST_FLOAT64_BE;
break;
default:
goto parse_error;
}
} else if (!strncmp (mimetype, "audio/x-alaw", 12)) {
spec->type = GST_BUFTYPE_A_LAW;
spec->format = GST_A_LAW;
} else if (!strncmp (mimetype, "audio/x-mulaw", 13)) {
spec->type = GST_BUFTYPE_MU_LAW;
spec->format = GST_MU_LAW;
} else {
goto parse_error;
}
/* get rate and channels */
if (!(gst_structure_get_int (structure, "rate", &spec->rate) &&
gst_structure_get_int (structure, "channels", &spec->channels)))
goto parse_error;
spec->bytes_per_sample = (spec->width >> 3) * spec->channels;
gst_caps_replace (&spec->caps, caps);
debug_spec_caps (sink, spec);
spec->buffer_time = sink->buffer_time;
spec->latency_time = sink->latency_time;
/* calculate suggested segsize and segtotal */
spec->segsize =
spec->rate * spec->bytes_per_sample * spec->latency_time / GST_MSECOND;
spec->segtotal = spec->buffer_time / spec->latency_time;
GST_DEBUG ("release old ringbuffer");
gst_ringbuffer_release (sink->ringbuffer);
gst_ringbuffer_acquire (sink->ringbuffer, spec);
debug_spec_buffer (sink, spec);
if (!gst_ringbuffer_acquire (sink->ringbuffer, spec))
goto acquire_error;
/* calculate actual latency and buffer times */
spec->latency_time =
spec->segsize * GST_MSECOND / (spec->rate * spec->bytes_per_sample);
spec->buffer_time =
spec->segtotal * spec->segsize * GST_MSECOND / (spec->rate *
spec->bytes_per_sample);
debug_spec_buffer (sink, spec);
return TRUE;
/* ERRORS */
parse_error:
{
return FALSE;
}
acquire_error:
{
return FALSE;
}
}
static void
@ -184,6 +428,36 @@ gst_baseaudiosink_get_times (GstBaseSink * bsink, GstBuffer * buffer,
static void
gst_baseaudiosink_event (GstBaseSink * bsink, GstEvent * event)
{
GstBaseAudioSink *sink = GST_BASEAUDIOSINK (bsink);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH:
if (GST_EVENT_FLUSH_DONE (event)) {
} else {
gst_ringbuffer_pause (sink->ringbuffer);
}
break;
case GST_EVENT_DISCONTINUOUS:
{
guint64 time, sample;
if (gst_event_discont_get_value (event, GST_FORMAT_DEFAULT, &sample,
NULL))
goto have_value;
if (gst_event_discont_get_value (event, GST_FORMAT_TIME, &time, NULL)) {
sample = time * sink->ringbuffer->spec.rate / GST_SECOND;
goto have_value;
}
g_warning ("discont without valid timestamp");
sample = 0;
have_value:
gst_ringbuffer_set_sample (sink->ringbuffer, sample);
break;
}
default:
break;
}
}
static GstFlowReturn
@ -195,9 +469,14 @@ gst_baseaudiosink_preroll (GstBaseSink * bsink, GstBuffer * buffer)
static GstFlowReturn
gst_baseaudiosink_render (GstBaseSink * bsink, GstBuffer * buf)
{
guint64 offset;
GstBaseAudioSink *sink = GST_BASEAUDIOSINK (bsink);
gst_ringbuffer_commit (sink->ringbuffer, 0,
offset = GST_BUFFER_OFFSET (buf);
GST_DEBUG ("in offset %llu, time %lld", offset, GST_BUFFER_TIMESTAMP (buf));
gst_ringbuffer_commit (sink->ringbuffer, offset,
GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf));
return GST_FLOW_OK;
@ -221,7 +500,8 @@ gst_baseaudiosink_create_ringbuffer (GstBaseAudioSink * sink)
}
void
gst_baseaudiosink_callback (GstRingBuffer * rbuf, guint advance, gpointer data)
gst_baseaudiosink_callback (GstRingBuffer * rbuf, guint8 * data, guint len,
gpointer user_data)
{
//GstBaseAudioSink *sink = GST_BASEAUDIOSINK (data);
}
@ -251,9 +531,14 @@ gst_baseaudiosink_change_state (GstElement * element)
switch (transition) {
case GST_STATE_PLAYING_TO_PAUSED:
gst_ringbuffer_stop (sink->ringbuffer);
gst_ringbuffer_pause (sink->ringbuffer);
/*
while (gst_ringbuffer_delay (sink->ringbuffer) > 0)
g_usleep (100);
*/
break;
case GST_STATE_PAUSED_TO_READY:
gst_ringbuffer_stop (sink->ringbuffer);
gst_ringbuffer_release (sink->ringbuffer);
gst_object_unref (GST_OBJECT (sink->ringbuffer));
break;

View file

@ -52,6 +52,7 @@
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include "gstringbuffer.h"
#include "gstaudioclock.h"
G_BEGIN_DECLS
@ -75,8 +76,11 @@ struct _GstBaseAudioSink {
GstRingBuffer *ringbuffer;
/* required buffer and latency */
GstClockTime buffer;
GstClockTime latency;
GstClockTime buffer_time;
GstClockTime latency_time;
/* clock */
GstClock *clock;
};
struct _GstBaseAudioSinkClass {

View file

@ -23,6 +23,9 @@
#include "gstringbuffer.h"
GST_DEBUG_CATEGORY_STATIC (gst_ringbuffer_debug);
#define GST_CAT_DEFAULT gst_ringbuffer_debug
static void gst_ringbuffer_class_init (GstRingBufferClass * klass);
static void gst_ringbuffer_init (GstRingBuffer * ringbuffer);
static void gst_ringbuffer_dispose (GObject * object);
@ -52,6 +55,9 @@ gst_ringbuffer_get_type (void)
ringbuffer_type = g_type_register_static (GST_TYPE_OBJECT, "GstRingBuffer",
&ringbuffer_info, G_TYPE_FLAG_ABSTRACT);
GST_DEBUG_CATEGORY_INIT (gst_ringbuffer_debug, "ringbuffer", 0,
"ringbuffer class");
}
return ringbuffer_type;
}
@ -76,12 +82,9 @@ gst_ringbuffer_init (GstRingBuffer * ringbuffer)
{
ringbuffer->acquired = FALSE;
ringbuffer->state = GST_RINGBUFFER_STATE_STOPPED;
ringbuffer->playseg = -1;
ringbuffer->writeseg = -1;
ringbuffer->segfilled = 0;
ringbuffer->freeseg = -1;
ringbuffer->segplayed = 0;
ringbuffer->cond = g_cond_new ();
ringbuffer->waiting = 0;
ringbuffer->empty_seg = NULL;
}
static void
@ -98,6 +101,7 @@ gst_ringbuffer_finalize (GObject * object)
GstRingBuffer *ringbuffer = GST_RINGBUFFER (object);
g_cond_free (ringbuffer->cond);
g_free (ringbuffer->empty_seg);
G_OBJECT_CLASS (parent_class)->finalize (G_OBJECT (ringbuffer));
}
@ -117,6 +121,8 @@ void
gst_ringbuffer_set_callback (GstRingBuffer * buf, GstRingBufferCallback cb,
gpointer data)
{
g_return_if_fail (buf != NULL);
GST_LOCK (buf);
buf->callback = cb;
buf->cb_data = data;
@ -142,6 +148,8 @@ gst_ringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
gboolean res = FALSE;
GstRingBufferClass *rclass;
g_return_val_if_fail (buf != NULL, FALSE);
GST_LOCK (buf);
if (buf->acquired) {
res = TRUE;
@ -156,12 +164,25 @@ gst_ringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
if (!res) {
buf->acquired = FALSE;
} else {
buf->freeseg = spec->segtotal;
if (buf->spec.bytes_per_sample != 0) {
gint i, j;
buf->samples_per_seg = buf->spec.segsize / buf->spec.bytes_per_sample;
/* create an empty segment */
g_free (buf->empty_seg);
buf->empty_seg = g_malloc (buf->spec.segsize);
for (i = 0, j = 0; i < buf->spec.segsize; i++) {
buf->empty_seg[i] = buf->spec.silence_sample[j];
j = (j + 1) % buf->spec.bytes_per_sample;
}
/* set sample position to 0 */
gst_ringbuffer_set_sample (buf, 0);
} else {
g_warning ("invalid bytes_per_sample from acquire ringbuffer");
buf->samples_per_seg = buf->spec.segsize;
g_warning
("invalid bytes_per_sample from acquire ringbuffer, fix the element");
buf->acquired = FALSE;
res = FALSE;
}
}
done:
@ -186,6 +207,8 @@ gst_ringbuffer_release (GstRingBuffer * buf)
gboolean res = FALSE;
GstRingBufferClass *rclass;
g_return_val_if_fail (buf != NULL, FALSE);
gst_ringbuffer_stop (buf);
GST_LOCK (buf);
@ -201,6 +224,9 @@ gst_ringbuffer_release (GstRingBuffer * buf)
if (!res) {
buf->acquired = TRUE;
} else {
g_free (buf->empty_seg);
buf->empty_seg = NULL;
}
done:
@ -209,33 +235,6 @@ done:
return res;
}
static gboolean
gst_ringbuffer_play_unlocked (GstRingBuffer * buf)
{
gboolean res = FALSE;
GstRingBufferClass *rclass;
/* if paused, set to playing */
res = g_atomic_int_compare_and_exchange (&buf->state,
GST_RINGBUFFER_STATE_STOPPED, GST_RINGBUFFER_STATE_PLAYING);
if (!res) {
/* was not stopped */
res = TRUE;
goto done;
}
rclass = GST_RINGBUFFER_GET_CLASS (buf);
if (rclass->play)
res = rclass->play (buf);
if (!res) {
buf->state = GST_RINGBUFFER_STATE_PAUSED;
}
done:
return res;
}
/**
* gst_ringbuffer_play:
* @buf: the #GstRingBuffer to play
@ -250,9 +249,42 @@ gboolean
gst_ringbuffer_play (GstRingBuffer * buf)
{
gboolean res = FALSE;
GstRingBufferClass *rclass;
gboolean resume = FALSE;
g_return_val_if_fail (buf != NULL, FALSE);
GST_LOCK (buf);
res = gst_ringbuffer_play_unlocked (buf);
/* if paused, set to playing */
res = g_atomic_int_compare_and_exchange (&buf->state,
GST_RINGBUFFER_STATE_STOPPED, GST_RINGBUFFER_STATE_PLAYING);
if (!res) {
/* was not stopped, try from paused */
res = g_atomic_int_compare_and_exchange (&buf->state,
GST_RINGBUFFER_STATE_PAUSED, GST_RINGBUFFER_STATE_PLAYING);
if (!res) {
/* was not paused either, must be playing then */
res = TRUE;
goto done;
}
resume = TRUE;
}
rclass = GST_RINGBUFFER_GET_CLASS (buf);
if (resume) {
if (rclass->resume)
res = rclass->resume (buf);
} else {
if (rclass->play)
res = rclass->play (buf);
}
if (!res) {
buf->state = GST_RINGBUFFER_STATE_PAUSED;
}
done:
GST_UNLOCK (buf);
return res;
@ -274,6 +306,8 @@ gst_ringbuffer_pause (GstRingBuffer * buf)
gboolean res = FALSE;
GstRingBufferClass *rclass;
g_return_val_if_fail (buf != NULL, FALSE);
GST_LOCK (buf);
/* if playing, set to paused */
res = g_atomic_int_compare_and_exchange (&buf->state,
@ -301,50 +335,6 @@ done:
return res;
}
/**
* gst_ringbuffer_resume:
* @buf: the #GstRingBuffer to resume
*
* Resume playing samples from the ringbuffer in the paused state.
*
* Returns: TRUE if the device could be paused, FALSE on error.
*
* MT safe.
*/
gboolean
gst_ringbuffer_resume (GstRingBuffer * buf)
{
gboolean res = FALSE;
GstRingBufferClass *rclass;
GST_LOCK (buf);
/* if playing, set to paused */
res = g_atomic_int_compare_and_exchange (&buf->state,
GST_RINGBUFFER_STATE_PAUSED, GST_RINGBUFFER_STATE_PLAYING);
if (!res) {
/* was not paused */
res = TRUE;
goto done;
}
/* signal any waiters */
GST_RINGBUFFER_SIGNAL (buf);
rclass = GST_RINGBUFFER_GET_CLASS (buf);
if (rclass->resume)
res = rclass->resume (buf);
if (!res) {
buf->state = GST_RINGBUFFER_STATE_PAUSED;
}
done:
GST_UNLOCK (buf);
return res;
}
/**
* gst_ringbuffer_stop:
* @buf: the #GstRingBuffer to stop
@ -361,6 +351,8 @@ gst_ringbuffer_stop (GstRingBuffer * buf)
gboolean res = FALSE;
GstRingBufferClass *rclass;
g_return_val_if_fail (buf != NULL, FALSE);
GST_LOCK (buf);
/* if playing, set to stopped */
res = g_atomic_int_compare_and_exchange (&buf->state,
@ -382,11 +374,7 @@ gst_ringbuffer_stop (GstRingBuffer * buf)
if (!res) {
buf->state = GST_RINGBUFFER_STATE_PLAYING;
} else {
buf->segfilled = 0;
buf->playseg = -1;
buf->writeseg = -1;
buf->freeseg = buf->spec.segtotal;
buf->segplayed = 0;
gst_ringbuffer_set_sample (buf, 0);
}
done:
GST_UNLOCK (buf);
@ -394,55 +382,6 @@ done:
return res;
}
/**
* gst_ringbuffer_callback:
* @buf: the #GstRingBuffer to callback
* @advance: the number of segments written
*
* Subclasses should call this function to notify the fact that
* @advance segments are now played by the device.
*
* MT safe.
*/
void
gst_ringbuffer_callback (GstRingBuffer * buf, guint advance)
{
gint prevfree;
gint segtotal;
if (advance == 0)
return;
segtotal = buf->spec.segtotal;
/* update counter */
g_atomic_int_add (&buf->segplayed, advance);
/* update free segments counter */
prevfree = g_atomic_int_exchange_and_add (&buf->freeseg, advance);
if (prevfree + advance > segtotal) {
g_warning ("underrun!! read %d, write %d, advance %d, free %d, prevfree %d",
buf->playseg, buf->writeseg, advance, buf->freeseg, prevfree);
buf->freeseg = segtotal;
buf->writeseg = buf->playseg;
/* make sure to signal */
prevfree = -1;
}
buf->playseg = (buf->playseg + advance) % segtotal;
if (prevfree == -1) {
/* we need to take the lock to make sure the other thread is
* blocking in the wait */
GST_LOCK (buf);
GST_RINGBUFFER_SIGNAL (buf);
GST_UNLOCK (buf);
}
if (buf->callback)
buf->callback (buf, advance, buf->cb_data);
}
/**
* gst_ringbuffer_delay:
* @buf: the #GstRingBuffer to query
@ -462,6 +401,8 @@ gst_ringbuffer_delay (GstRingBuffer * buf)
GstRingBufferClass *rclass;
guint res = 0;
g_return_val_if_fail (buf != NULL, 0);
rclass = GST_RINGBUFFER_GET_CLASS (buf);
if (rclass->delay)
res = rclass->delay (buf);
@ -484,19 +425,100 @@ guint64
gst_ringbuffer_played_samples (GstRingBuffer * buf)
{
gint segplayed;
guint64 samples;
guint64 raw, samples;
guint delay;
g_return_val_if_fail (buf != NULL, 0);
/* get the amount of segments we played */
segplayed = g_atomic_int_get (&buf->segplayed);
/* and the number of samples not yet played */
delay = gst_ringbuffer_delay (buf);
samples = (segplayed * buf->samples_per_seg) - delay;
samples = (segplayed * buf->samples_per_seg);
raw = samples;
if (samples >= delay)
samples -= delay;
GST_DEBUG ("played samples: raw %llu, delay %u, real %llu", raw, delay,
samples);
return samples;
}
/**
* gst_ringbuffer_set_sample:
* @buf: the #GstRingBuffer to use
* @sample: the sample number to set
*
* Make sure that the next sample written to the device is
* accounted for as being the @sample sample written to the
* device. This value will be used in reporting the current
* sample position of the ringbuffer.
*
* This function will also clear the buffer with silence.
*
* MT safe.
*/
void
gst_ringbuffer_set_sample (GstRingBuffer * buf, guint64 sample)
{
gint i;
g_return_if_fail (buf != NULL);
if (sample == -1)
sample = 0;
/* FIXME, we assume the ringbuffer can restart at a random
* position, round down to the beginning and keep track of
* offset when calculating the played samples. */
buf->segplayed = sample / buf->samples_per_seg;
buf->next_sample = sample;
for (i = 0; i < buf->spec.segtotal; i++) {
gst_ringbuffer_clear (buf, i);
}
GST_DEBUG ("setting sample to %llu, segplayed %d", sample, buf->segplayed);
}
static gboolean
wait_segment (GstRingBuffer * buf)
{
/* buffer must be playing now or we deadlock since nobody is reading */
if (g_atomic_int_get (&buf->state) != GST_RINGBUFFER_STATE_PLAYING) {
GST_DEBUG ("play!");
gst_ringbuffer_play (buf);
}
/* take lock first, then update our waiting flag */
GST_LOCK (buf);
if (g_atomic_int_compare_and_exchange (&buf->waiting, 0, 1)) {
GST_DEBUG ("waiting..");
if (g_atomic_int_get (&buf->state) != GST_RINGBUFFER_STATE_PLAYING)
goto not_playing;
GST_RINGBUFFER_WAIT (buf);
if (g_atomic_int_get (&buf->state) != GST_RINGBUFFER_STATE_PLAYING)
goto not_playing;
}
GST_UNLOCK (buf);
return TRUE;
/* ERROR */
not_playing:
{
GST_UNLOCK (buf);
GST_DEBUG ("stopped playing");
return FALSE;
}
}
/**
* gst_ringbuffer_commit:
* @buf: the #GstRingBuffer to commit
@ -511,106 +533,106 @@ gst_ringbuffer_played_samples (GstRingBuffer * buf)
* @len should not be a multiple of the segment size of the ringbuffer
* although it is recommended.
*
* Returns: The number of samples written to the ringbuffer.
* Returns: The number of samples written to the ringbuffer or -1 on
* error.
*
* MT safe.
*/
/* FIXME, write the samples into the right position in the ringbuffer based
* on the sample position argument
*/
guint
gst_ringbuffer_commit (GstRingBuffer * buf, guint64 sample, guchar * data,
guint len)
{
guint towrite = len;
gint segsize, segtotal;
gint segplayed;
gint segsize, segtotal, bps, sps;
guint8 *dest;
if (buf->data == NULL)
goto no_buffer;
g_return_val_if_fail (buf != NULL, -1);
g_return_val_if_fail (buf->data != NULL, -1);
g_return_val_if_fail (data != NULL, -1);
if (sample == -1) {
/* play aligned with last sample */
sample = buf->next_sample;
} else {
if (sample != buf->next_sample) {
GST_WARNING ("discontinuity found got %" G_GUINT64_FORMAT
", expected %" G_GUINT64_FORMAT, sample, buf->next_sample);
}
}
dest = GST_BUFFER_DATA (buf->data);
segsize = buf->spec.segsize;
segtotal = buf->spec.segtotal;
bps = buf->spec.bytes_per_sample;
sps = buf->samples_per_seg;
/* we write the complete buffer in chunks of segsize so that we can check for
* a filled buffer after each segment. */
while (towrite > 0) {
gint segavail;
gint segwrite;
gint writeseg;
gint segfilled;
/* we assume the complete buffer will be consumed and the next sample
* should be written after this */
buf->next_sample = sample + len / bps;
segfilled = buf->segfilled;
/* write out all bytes */
while (len > 0) {
gint writelen;
gint writeseg, writeoff;
/* check for partial buffer */
if (G_LIKELY (segfilled == 0)) {
gint prevfree;
gint newseg;
/* figure out the segment and the offset inside the segment where
* the sample should be written. */
writeseg = sample / sps;
writeoff = (sample % sps) * bps;
/* no partial buffer to fill up, allocate a new one */
prevfree = g_atomic_int_exchange_and_add (&buf->freeseg, -1);
if (prevfree == 0) {
/* nothing was free */
GST_DEBUG ("filled %d %d", buf->writeseg, buf->playseg);
while (TRUE) {
gint diff;
GST_LOCK (buf);
/* buffer must be playing now or we deadlock since nobody is reading */
if (g_atomic_int_get (&buf->state) != GST_RINGBUFFER_STATE_PLAYING)
gst_ringbuffer_play_unlocked (buf);
/* get the currently playing segment */
segplayed = g_atomic_int_get (&buf->segplayed);
GST_RINGBUFFER_WAIT (buf);
if (g_atomic_int_get (&buf->state) != GST_RINGBUFFER_STATE_PLAYING)
goto not_playing;
GST_UNLOCK (buf);
/* see how far away it is from the write segment */
diff = writeseg - segplayed;
GST_DEBUG
("pointer at %d, sample %llu, write to %d-%d, len %d, diff %d, segtotal %d, segsize %d",
segplayed, sample, writeseg, writeoff, len, diff, segtotal, segsize);
/* play segment too far ahead, we need to drop */
if (diff < 0) {
/* we need to drop one segment at a time, pretend we wrote a
* segment. */
writelen = MIN (segsize, len);
goto next;
}
/* need to do this atomic as the reader updates the write pointer on
* overruns */
do {
writeseg = g_atomic_int_get (&buf->writeseg);
newseg = (writeseg + 1) % segtotal;
} while (!g_atomic_int_compare_and_exchange (&buf->writeseg, writeseg,
newseg));
writeseg = newseg;
} else {
/* this is the segment we should write to */
writeseg = g_atomic_int_get (&buf->writeseg);
}
if (writeseg < 0 || writeseg > segtotal) {
g_warning ("invalid segment %d", writeseg);
writeseg = 0;
/* write segment is within writable range, we can break the loop and
* start writing the data. */
if (diff < segtotal)
break;
/* else we need to wait for the segment to become writable. */
if (!wait_segment (buf))
goto not_playing;
}
/* this is the available size now in the current segment */
segavail = segsize - segfilled;
/* we can write now */
writeseg = writeseg % segtotal;
writelen = MIN (segsize - writeoff, len);
/* we write up to the available space */
segwrite = MIN (segavail, towrite);
GST_DEBUG ("write @%p seg %d, off %d, len %d",
dest + writeseg * segsize, writeseg, writeoff, writelen);
memcpy (dest + writeseg * segsize + segfilled, data, segwrite);
memcpy (dest + writeseg * segsize + writeoff, data, writelen);
towrite -= segwrite;
data += segwrite;
if (segfilled + segwrite == segsize) {
buf->segfilled = 0;
} else {
buf->segfilled = segfilled + segwrite;
}
next:
len -= writelen;
data += writelen;
sample += writelen / bps;
}
return len - towrite;
no_buffer:
{
GST_DEBUG ("no buffer");
return -1;
}
return len;
/* ERRORS */
not_playing:
{
GST_UNLOCK (buf);
GST_DEBUG ("stopped playing");
return len - towrite;
return -1;
}
}
@ -618,20 +640,78 @@ not_playing:
* gst_ringbuffer_prepare_read:
* @buf: the #GstRingBuffer to read from
* @segment: the segment to read
* @readptr: the pointer to the memory where samples can be read
* @len: the number of bytes to read
*
* Returns a pointer to memory where the data from segment @segment
* can be found.
* can be found. This function is used by subclasses.
*
* Returns: FALSE if the buffer is not playing.
*
* MT safe.
*/
guint8 *
gst_ringbuffer_prepare_read (GstRingBuffer * buf, gint segment)
gboolean
gst_ringbuffer_prepare_read (GstRingBuffer * buf, gint * segment,
guint8 ** readptr, gint * len)
{
guint8 *data;
gint segplayed;
/* buffer must be playing */
if (g_atomic_int_get (&buf->state) != GST_RINGBUFFER_STATE_PLAYING)
return FALSE;
g_return_val_if_fail (buf != NULL, FALSE);
g_return_val_if_fail (buf->data != NULL, FALSE);
g_return_val_if_fail (readptr != NULL, FALSE);
g_return_val_if_fail (len != NULL, FALSE);
data = GST_BUFFER_DATA (buf->data);
return data + (segment % buf->spec.segtotal) * buf->spec.segsize;
/* get the position of the play pointer */
segplayed = g_atomic_int_get (&buf->segplayed);
*segment = segplayed % buf->spec.segtotal;
*len = buf->spec.segsize;
*readptr = data + *segment * *len;
/* callback to fill the memory with data */
if (buf->callback)
buf->callback (buf, *readptr, *len, buf->cb_data);
GST_DEBUG ("prepare read from segment %d (real %d) @%p",
*segment, segplayed, *readptr);
return TRUE;
}
/**
* gst_ringbuffer_advance:
* @buf: the #GstRingBuffer to advance
* @advance: the number of segments written
*
* Subclasses should call this function to notify the fact that
* @advance segments are now played by the device.
*
* MT safe.
*/
void
gst_ringbuffer_advance (GstRingBuffer * buf, guint advance)
{
g_return_if_fail (buf != NULL);
/* update counter */
g_atomic_int_add (&buf->segplayed, advance);
/* the lock is already taken when the waiting flag is set,
* we grab the lock as well to make sure the waiter is actually
* waiting for the signal */
if (g_atomic_int_compare_and_exchange (&buf->waiting, 1, 0)) {
GST_LOCK (buf);
GST_DEBUG ("signal waiter");
GST_RINGBUFFER_SIGNAL (buf);
GST_UNLOCK (buf);
}
}
/**
@ -649,8 +729,13 @@ gst_ringbuffer_clear (GstRingBuffer * buf, gint segment)
{
guint8 *data;
data = GST_BUFFER_DATA (buf->data);
g_return_if_fail (buf != NULL);
g_return_if_fail (buf->data != NULL);
g_return_if_fail (buf->empty_seg != NULL);
memset (data + (segment % buf->spec.segtotal) * buf->spec.segsize, 0,
buf->spec.segsize);
data = GST_BUFFER_DATA (buf->data);
data += (segment % buf->spec.segtotal) * buf->spec.segsize,
GST_DEBUG ("clear segment %d @%p", segment, data);
memcpy (data, buf->empty_seg, buf->spec.segsize);
}

View file

@ -38,7 +38,8 @@ typedef struct _GstRingBuffer GstRingBuffer;
typedef struct _GstRingBufferClass GstRingBufferClass;
typedef struct _GstRingBufferSpec GstRingBufferSpec;
typedef void (*GstRingBufferCallback) (GstRingBuffer *rbuf, guint advance, gpointer data);
/* called to fill data with len bytes of samples */
typedef void (*GstRingBufferCallback) (GstRingBuffer *rbuf, guint8* data, guint len, gpointer user_data);
typedef enum {
GST_RINGBUFFER_STATE_STOPPED,
@ -55,21 +56,64 @@ typedef enum {
typedef enum
{
GST_U8,
GST_BUFTYPE_LINEAR,
GST_BUFTYPE_FLOAT,
GST_BUFTYPE_MU_LAW,
GST_BUFTYPE_A_LAW,
GST_BUFTYPE_IMA_ADPCM,
GST_BUFTYPE_MPEG,
GST_BUFTYPE_GSM,
} GstBufferFormatType;
typedef enum
{
GST_UNKNOWN,
GST_S8,
GST_U8,
GST_U16_LE,
GST_S16_LE,
GST_U16_BE,
GST_S16_BE,
GST_U16_LE,
GST_U16_BE,
GST_U24_LE,
GST_S24_LE,
GST_U24_BE,
GST_S24_BE,
GST_U24_LE,
GST_U24_BE,
GST_S32_LE,
GST_S32_BE,
GST_U32_LE,
GST_U32_BE,
GST_S24_3LE,
GST_S24_3BE,
GST_U24_3LE,
GST_U24_3BE,
GST_S20_3LE,
GST_S20_3BE,
GST_U20_3LE,
GST_U20_3BE,
GST_S18_3LE,
GST_S18_3BE,
GST_U18_3LE,
GST_U18_3BE,
GST_FLOAT32_LE,
GST_FLOAT32_BE,
GST_FLOAT64_LE,
GST_FLOAT64_BE,
GST_MU_LAW,
GST_A_LAW,
GST_IMA_ADPCM,
GST_MPEG,
GST_GSM,
/* fill me */
GST_FLOAT_LE,
GST_FLOAT_BE,
} GstBufferFormat;
struct _GstRingBufferSpec
@ -78,12 +122,17 @@ struct _GstRingBufferSpec
GstCaps *caps; /* the caps of the buffer */
/* in/out */
GstBufferFormatType type;
GstBufferFormat format;
gboolean sign;
gboolean bigend;
gint width;
gint depth;
gint rate;
gint channels;
GstClockTime latency; /* the required/actual latency */
GstClockTime buffersize; /* the required/actual size of the buffer */
GstClockTime latency_time; /* the required/actual latency time */
GstClockTime buffer_time; /* the required/actual time of the buffer */
gint segsize; /* size of one buffer segement */
gint segtotal; /* total number of segments */
@ -107,18 +156,15 @@ struct _GstRingBuffer {
GstRingBufferSpec spec;
GstRingBufferSegState *segstate;
gint samples_per_seg; /* number of samples per segment */
guint8 *empty_seg;
/*< public >*/ /* ATOMIC */
gint state; /* state of the buffer */
gint freeseg; /* number of free segments */
gint segplayed; /* number of segments played since last start */
/*< protected >*/
gint playseg; /* segment currently playing */
gint writeseg; /* segment currently written */
gint segfilled; /* bytes used in current write segment */
gint waiting; /* when waiting for a segment to be freed */
/*< private >*/
guint64 next_sample; /* the next sample we need to write */
GstRingBufferCallback callback;
gpointer cb_data;
};
@ -146,8 +192,7 @@ GType gst_ringbuffer_get_type(void);
/* callback stuff */
void gst_ringbuffer_set_callback (GstRingBuffer *buf, GstRingBufferCallback cb,
gpointer data);
void gst_ringbuffer_callback (GstRingBuffer *buf, guint advance);
gpointer user_data);
/* allocate resources */
gboolean gst_ringbuffer_acquire (GstRingBuffer *buf, GstRingBufferSpec *spec);
@ -156,20 +201,22 @@ gboolean gst_ringbuffer_release (GstRingBuffer *buf);
/* playback/pause */
gboolean gst_ringbuffer_play (GstRingBuffer *buf);
gboolean gst_ringbuffer_pause (GstRingBuffer *buf);
gboolean gst_ringbuffer_resume (GstRingBuffer *buf);
gboolean gst_ringbuffer_stop (GstRingBuffer *buf);
/* get status */
guint gst_ringbuffer_delay (GstRingBuffer *buf);
guint64 gst_ringbuffer_played_samples (GstRingBuffer *buf);
void gst_ringbuffer_set_sample (GstRingBuffer *buf, guint64 sample);
/* commit samples */
guint gst_ringbuffer_commit (GstRingBuffer *buf, guint64 sample,
guchar *data, guint len);
/* mostly protected */
guint8* gst_ringbuffer_prepare_read (GstRingBuffer *buf, gint segment);
gboolean gst_ringbuffer_prepare_read (GstRingBuffer *buf, gint *segment, guint8 **readptr, gint *len);
void gst_ringbuffer_clear (GstRingBuffer *buf, gint segment);
void gst_ringbuffer_advance (GstRingBuffer *buf, guint advance);
G_END_DECLS