pulsesink: Add support for compressed formats

This adds support for various compressed formats (AC3, E-AC3, DTS and
MP3) payloaded in IEC 61937 format (used for transmission over S/PDIF,
HDMI and Bluetooth).

The acceptcaps() function allows bins to probe for what formats the sink
being connected to support. This only works after the element is set to
at least READY.

If the underlying sink changes and the format we are streaming is not
available, we emit a message that will allow upstream elements/bins to
block and renegotiate a new format.
This commit is contained in:
Arun Raghavan 2011-03-09 11:04:36 +05:30
parent a67b536741
commit ac7cad431c
3 changed files with 394 additions and 38 deletions

View file

@ -54,6 +54,7 @@
#include <gst/gsttaglist.h>
#include <gst/interfaces/streamvolume.h>
#include <gst/gst-i18n-plugin.h>
#include <gst/audio/gstaudioiec61937.h>
#include <gst/pbutils/pbutils.h> /* only used for GST_PLUGINS_BASE_VERSION_* */
@ -138,7 +139,7 @@ struct _GstPulseRingBuffer
pa_context *context;
pa_stream *stream;
#if HAVE_PULSE_1_0
#ifdef HAVE_PULSE_1_0
pa_format_info *format;
guint channels;
#else
@ -712,11 +713,60 @@ gst_pulsering_stream_event_cb (pa_stream * p, const char *name,
gst_element_post_message (GST_ELEMENT_CAST (psink),
gst_message_new_request_state (GST_OBJECT_CAST (psink),
GST_STATE_PLAYING));
#ifdef HAVE_PULSE_1_0
} else if (!strcmp (name, PA_STREAM_EVENT_FORMAT_LOST)) {
GstEvent *renego;
if (g_atomic_int_get (&psink->format_lost)) {
/* Duplicate event before we're done reconfiguring, discard */
return;
}
GST_DEBUG_OBJECT (psink, "got FORMAT LOST");
g_atomic_int_set (&psink->format_lost, 1);
psink->format_lost_time = g_ascii_strtoull (pa_proplist_gets (pl,
"stream-time"), NULL, 0) * 1000;
g_free (psink->device);
psink->device = g_strdup (pa_proplist_gets (pl, "device"));
renego = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("pulse-format-lost", NULL));
if (!gst_pad_push_event (GST_BASE_SINK (psink)->sinkpad, renego)) {
/* Nobody handled the format change - emit an error */
GST_ELEMENT_ERROR (psink, STREAM, FORMAT, ("Sink format changed"),
("Sink format changed"));
}
#endif
} else {
GST_DEBUG_OBJECT (psink, "got unknown event %s", name);
}
}
/* Called with the mainloop locked */
static gboolean
gst_pulsering_wait_for_stream_ready (GstPulseSink * psink, pa_stream * stream)
{
pa_stream_state_t state;
for (;;) {
state = pa_stream_get_state (stream);
GST_LOG_OBJECT (psink, "stream state is now %d", state);
if (!PA_STREAM_IS_GOOD (state))
return FALSE;
if (state == PA_STREAM_READY)
return TRUE;
/* Wait until the stream is ready */
pa_threaded_mainloop_wait (mainloop);
}
}
/* This method should create a new stream of the given @spec. No playback should
* start yet so we start in the corked state. */
static gboolean
@ -737,6 +787,9 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
GstAudioClock *clock;
#ifdef HAVE_PULSE_1_0
pa_format_info *formats[1];
#ifndef GST_DISABLE_GST_DEBUG
gchar print_buf[PA_FORMAT_INFO_SNPRINT_MAX];
#endif
#endif
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
@ -833,6 +886,10 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
#ifdef HAVE_PULSE_1_0
if (pa_format_info_is_pcm (pbuf->format))
gst_pulse_cvolume_from_linear (pv, pbuf->channels, psink->volume);
else {
GST_DEBUG_OBJECT (psink, "passthrough stream, not setting volume");
pv = NULL;
}
#else
gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels,
psink->volume);
@ -863,22 +920,19 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock);
gst_audio_clock_reset (clock, 0);
for (;;) {
pa_stream_state_t state;
if (!gst_pulsering_wait_for_stream_ready (psink, pbuf->stream))
goto connect_failed;
state = pa_stream_get_state (pbuf->stream);
#ifdef HAVE_PULSE_1_0
g_free (psink->device);
psink->device = g_strdup (pa_stream_get_device_name (pbuf->stream));
GST_LOG_OBJECT (psink, "stream state is now %d", state);
if (!PA_STREAM_IS_GOOD (state))
goto connect_failed;
if (state == PA_STREAM_READY)
break;
/* Wait until the stream is ready */
pa_threaded_mainloop_wait (mainloop);
}
#ifndef GST_DISABLE_GST_DEBUG
pa_format_info_snprint (print_buf, sizeof (print_buf),
pa_stream_get_format_info (pbuf->stream));
GST_INFO_OBJECT (psink, "negotiated to: %s", print_buf);
#endif
#endif
/* After we passed the volume off of to PA we never want to set it
again, since it is PA's job to save/restore volumes. */
@ -945,13 +999,20 @@ static gboolean
gst_pulseringbuffer_release (GstRingBuffer * buf)
{
GstPulseRingBuffer *pbuf;
GstPulseSink *psink;
pbuf = GST_PULSERING_BUFFER_CAST (buf);
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
pa_threaded_mainloop_lock (mainloop);
gst_pulsering_destroy_stream (pbuf);
pa_threaded_mainloop_unlock (mainloop);
#ifdef HAVE_PULSE_1_0
g_atomic_int_set (&psink->format_lost, FALSE);
psink->format_lost_time = GST_CLOCK_TIME_NONE;
#endif
return TRUE;
}
@ -973,6 +1034,13 @@ gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked,
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
#ifdef HAVE_PULSE_1_0
if (g_atomic_int_get (&psink->format_lost)) {
/* Sink format changed, stream's gone so fake being paused */
return TRUE;
}
#endif
GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked);
if (pbuf->corked != corked) {
if (!(o = pa_stream_cork (pbuf->stream, corked,
@ -1146,13 +1214,22 @@ gst_pulseringbuffer_stop (GstRingBuffer * buf)
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
pa_threaded_mainloop_lock (mainloop);
pbuf->paused = TRUE;
res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
/* Inform anyone waiting in _commit() call that it shall wakeup */
if (pbuf->in_commit) {
GST_DEBUG_OBJECT (psink, "signal commit thread");
pa_threaded_mainloop_signal (mainloop, 0);
}
#ifdef HAVE_PULSE_1_0
if (g_atomic_int_get (&psink->format_lost)) {
/* Don't try to flush, the stream's probably gone by now */
res = TRUE;
goto cleanup;
}
#endif
/* then try to flush, it's not fatal when this fails */
GST_DEBUG_OBJECT (psink, "flushing");
@ -1260,7 +1337,6 @@ G_STMT_START { \
GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess); \
} G_STMT_END
/* our custom commit function because we write into the buffer of pulseaudio
* instead of keeping our own buffer */
static guint
@ -1299,6 +1375,7 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
}
pa_threaded_mainloop_lock (mainloop);
GST_DEBUG_OBJECT (psink, "entering commit");
pbuf->in_commit = TRUE;
@ -1323,6 +1400,13 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
* needed to properly handle reverse playback: it points to the last sample. */
data_end = data + (bps * inr);
#ifdef HAVE_PULSE_1_0
if (g_atomic_int_get (&psink->format_lost)) {
/* Sink format changed, drop the data and hope upstream renegotiates */
goto fake_done;
}
#endif
if (pbuf->paused)
goto was_paused;
@ -1372,6 +1456,13 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
for (;;) {
pbuf->m_writable = pa_stream_writable_size (pbuf->stream);
#ifdef HAVE_PULSE_1_0
if (g_atomic_int_get (&psink->format_lost)) {
/* Sink format changed, give up and hope upstream renegotiates */
goto fake_done;
}
#endif
if (pbuf->m_writable == (size_t) - 1)
goto writable_size_failed;
@ -1430,6 +1521,14 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
GST_LOG_OBJECT (psink, "writing %u samples at offset %" G_GUINT64_FORMAT,
(guint) avail, offset);
#ifdef HAVE_PULSE_1_0
/* No trick modes for passthrough streams */
if (G_UNLIKELY (inr != outr || reverse)) {
GST_WARNING_OBJECT (psink, "Passthrough stream can't run in trick mode");
goto unlock_and_fail;
}
#endif
if (G_LIKELY (inr == outr && !reverse)) {
/* no rate conversion, simply write out the samples */
/* copy the data into internal buffer */
@ -1509,6 +1608,10 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
}
}
}
#ifdef HAVE_PULSE_1_0
fake_done:
#endif
/* we consumed all samples here */
data = data_end + bps;
@ -1730,8 +1833,16 @@ gst_pulsesink_base_init (gpointer g_class)
"rate = (int) [ 1, MAX], "
"channels = (int) [ 1, 32 ];"
"audio/x-mulaw, "
"rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]")
);
"rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ];"
#ifdef HAVE_PULSE_1_0
"audio/x-ac3, framed = (boolean) true;"
"audio/x-eac3, framed = (boolean) true; "
"audio/x-dts, framed = (boolean) true, "
" block_size = (int) { 512, 1024, 2048 }; "
"audio/mpeg, mpegversion = (int)1, "
" mpegaudioversion = (int) [ 1, 2 ], parsed = (boolean) true; "
#endif
));
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
@ -1754,6 +1865,40 @@ gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink)
return buffer;
}
static GstBuffer *
gst_pulsesink_payload (GstBaseAudioSink * sink, GstBuffer * buf)
{
switch (sink->ringbuffer->spec.type) {
case GST_BUFTYPE_AC3:
case GST_BUFTYPE_EAC3:
case GST_BUFTYPE_DTS:
case GST_BUFTYPE_MPEG:
{
/* FIXME: alloc memory from PA if possible */
gint framesize = gst_audio_iec61937_frame_size (&sink->ringbuffer->spec);
GstBuffer *out;
if (framesize <= 0)
return NULL;
out = gst_buffer_new_and_alloc (framesize);
if (!gst_audio_iec61937_payload (GST_BUFFER_DATA (buf),
GST_BUFFER_SIZE (buf), GST_BUFFER_DATA (out),
GST_BUFFER_SIZE (out), &sink->ringbuffer->spec)) {
gst_buffer_unref (out);
return NULL;
}
gst_buffer_copy_metadata (out, buf, GST_BUFFER_COPY_ALL);
return out;
}
default:
return gst_buffer_ref (buf);
}
}
static void
gst_pulsesink_class_init (GstPulseSinkClass * klass)
{
@ -1778,6 +1923,7 @@ gst_pulsesink_class_init (GstPulseSinkClass * klass)
gstaudiosink_class->create_ringbuffer =
GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer);
gstaudiosink_class->payload = GST_DEBUG_FUNCPTR (gst_pulsesink_payload);
/* Overwrite GObject fields */
g_object_class_install_property (gobject_class,
@ -1860,6 +2006,14 @@ gst_pulsesink_get_time (GstClock * clock, GstBaseAudioSink * sink)
pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer);
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
#ifdef HAVE_PULSE_1_0
if (g_atomic_int_get (&psink->format_lost)) {
/* Stream was lost in a format change, it'll get set up again once
* upstream renegotiates */
return psink->format_lost_time;
}
#endif
pa_threaded_mainloop_lock (mainloop);
if (gst_pulsering_is_dead (psink, pbuf, TRUE))
goto server_dead;
@ -1888,6 +2042,181 @@ server_dead:
}
}
static void
gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol,
void *userdata)
{
GstPulseRingBuffer *pbuf;
GstPulseSink *psink;
#ifdef HAVE_PULSE_1_0
GList *l;
guint8 j;
#endif
pbuf = GST_PULSERING_BUFFER_CAST (userdata);
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
if (!i)
goto done;
g_free (psink->device_description);
psink->device_description = g_strdup (i->description);
#ifdef HAVE_PULSE_1_0
g_mutex_lock (psink->sink_formats_lock);
for (l = g_list_first (psink->sink_formats); l; l = g_list_next (l))
pa_format_info_free ((pa_format_info *) l->data);
g_list_free (psink->sink_formats);
psink->sink_formats = NULL;
for (j = 0; j < i->n_formats; j++)
psink->sink_formats = g_list_prepend (psink->sink_formats,
pa_format_info_copy (i->formats[j]));
g_mutex_unlock (psink->sink_formats_lock);
#endif
done:
pa_threaded_mainloop_signal (mainloop, 0);
}
#ifdef HAVE_PULSE_1_0
static gboolean
gst_pulsesink_pad_acceptcaps (GstPad * pad, GstCaps * caps)
{
GstPulseSink *psink = GST_PULSESINK (gst_pad_get_parent_element (pad));
GstPulseRingBuffer *pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK
(psink)->ringbuffer);
GstCaps *pad_caps;
GstStructure *st;
gboolean ret = FALSE;
GstRingBufferSpec spec = { 0 };
pa_stream *stream = NULL;
pa_operation *o = NULL;
pa_channel_map channel_map;
pa_stream_flags_t flags;
pa_format_info *format = NULL, *formats[1];
guint channels;
pad_caps = gst_pad_get_caps_reffed (pad);
if (pad_caps) {
ret = gst_caps_can_intersect (pad_caps, caps);
gst_caps_unref (pad_caps);
}
/* Either template caps didn't match, or we're still in NULL state */
if (!ret || !pbuf->context)
goto done;
/* If we've not got fixed caps, creating a stream might fail, so let's just
* return from here with default acceptcaps behaviour */
if (!gst_caps_is_fixed (caps))
goto done;
ret = FALSE;
pa_threaded_mainloop_lock (mainloop);
spec.latency_time = GST_BASE_AUDIO_SINK (psink)->latency_time;
if (!gst_ring_buffer_parse_caps (&spec, caps))
goto out;
if (!gst_pulse_fill_format_info (&spec, &format, &channels))
goto out;
/* Make sure input is framed (one frame per buffer) and can be payloaded */
if (!pa_format_info_is_pcm (format)) {
gboolean framed = FALSE, parsed = FALSE;
st = gst_caps_get_structure (caps, 0);
gst_structure_get_boolean (st, "framed", &framed);
gst_structure_get_boolean (st, "parsed", &parsed);
if ((!framed && !parsed) || gst_audio_iec61937_frame_size (&spec) <= 0)
goto out;
}
/* initialize the channel map */
if (pa_format_info_is_pcm (format) &&
gst_pulse_gst_to_channel_map (&channel_map, &spec))
pa_format_info_set_channel_map (format, &channel_map);
if (pbuf->stream) {
/* We're already in PAUSED or above, so just reuse this stream to query
* sink formats and use those. */
GList *i;
if (!(o = pa_context_get_sink_info_by_name (pbuf->context, psink->device,
gst_pulsesink_sink_info_cb, pbuf)))
goto info_failed;
while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
pa_threaded_mainloop_wait (mainloop);
if (gst_pulsering_is_dead (psink, pbuf, TRUE))
goto out;
}
g_mutex_lock (psink->sink_formats_lock);
for (i = g_list_first (psink->sink_formats); i; i = g_list_next (i)) {
if (pa_format_info_is_compatible ((pa_format_info *) i->data, format)) {
ret = TRUE;
break;
}
}
g_mutex_unlock (psink->sink_formats_lock);
} else {
/* We're in READY, let's connect a stream to see if the format is
* accpeted by whatever sink we're routed to */
formats[0] = format;
if (!(stream = pa_stream_new_extended (pbuf->context, "pulsesink probe",
formats, 1, psink->proplist)))
goto out;
/* construct the flags */
flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
PA_STREAM_ADJUST_LATENCY | PA_STREAM_START_CORKED;
pa_stream_set_state_callback (stream, gst_pulsering_stream_state_cb, pbuf);
if (pa_stream_connect_playback (stream, psink->device, NULL, flags, NULL,
NULL) < 0)
goto out;
ret = gst_pulsering_wait_for_stream_ready (psink, stream);
}
out:
if (format)
pa_format_info_free (format);
if (o)
pa_operation_unref (o);
if (stream) {
pa_stream_set_state_callback (stream, NULL, NULL);
pa_stream_disconnect (stream);
pa_stream_unref (stream);
}
pa_threaded_mainloop_unlock (mainloop);
done:
gst_object_unref (psink);
return ret;
info_failed:
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_context_get_sink_input_info() failed: %s",
pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto out;
}
}
#endif
static void
gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
{
@ -1896,6 +2225,11 @@ gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
pulsesink->device_description = NULL;
pulsesink->client_name = gst_pulse_client_name ();
#ifdef HAVE_PULSE_1_0
pulsesink->sink_formats_lock = g_mutex_new ();
pulsesink->sink_formats = NULL;
#endif
pulsesink->volume = DEFAULT_VOLUME;
pulsesink->volume_set = FALSE;
@ -1904,6 +2238,11 @@ gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
pulsesink->notify = 0;
#ifdef HAVE_PULSE_1_0
g_atomic_int_set (&pulsesink->format_lost, FALSE);
pulsesink->format_lost_time = GST_CLOCK_TIME_NONE;
#endif
pulsesink->properties = NULL;
pulsesink->proplist = NULL;
@ -1915,6 +2254,11 @@ gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
gst_audio_clock_new ("GstPulseSinkClock",
(GstAudioClockGetTimeFunc) gst_pulsesink_get_time, pulsesink);
#ifdef HAVE_PULSE_1_0
gst_pad_set_acceptcaps_function (GST_BASE_SINK (pulsesink)->sinkpad,
GST_DEBUG_FUNCPTR (gst_pulsesink_pad_acceptcaps));
#endif
/* TRUE for sinks, FALSE for sources */
pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink),
G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device,
@ -1925,12 +2269,23 @@ static void
gst_pulsesink_finalize (GObject * object)
{
GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
#ifdef HAVE_PULSE_1_0
GList *i;
#endif
g_free (pulsesink->server);
g_free (pulsesink->device);
g_free (pulsesink->device_description);
g_free (pulsesink->client_name);
#ifdef HAVE_PULSE_1_0
for (i = g_list_first (pulsesink->sink_formats); i; i = g_list_next (i))
pa_format_info_free ((pa_format_info *) i->data);
g_list_free (pulsesink->sink_formats);
g_mutex_free (pulsesink->sink_formats_lock);
#endif
if (pulsesink->properties)
gst_structure_free (pulsesink->properties);
if (pulsesink->proplist)
@ -1969,6 +2324,10 @@ gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume)
#ifdef HAVE_PULSE_1_0
if (pa_format_info_is_pcm (pbuf->format))
gst_pulse_cvolume_from_linear (&v, pbuf->channels, volume);
else
/* FIXME: this will eventually be superceded by checks to see if the volume
* is readable/writable */
goto unlock;
#else
gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
#endif
@ -2248,26 +2607,6 @@ info_failed:
}
}
static void
gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol,
void *userdata)
{
GstPulseRingBuffer *pbuf;
GstPulseSink *psink;
pbuf = GST_PULSERING_BUFFER_CAST (userdata);
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
if (!i)
goto done;
g_free (psink->device_description);
psink->device_description = g_strdup (i->description);
done:
pa_threaded_mainloop_signal (mainloop, 0);
}
static gchar *
gst_pulsesink_device_description (GstPulseSink * psink)
{
@ -2664,6 +3003,7 @@ gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
gst_message_new_clock_provide (GST_OBJECT_CAST (element),
GST_BASE_AUDIO_SINK (pulsesink)->provided_clock, TRUE));
break;
default:
break;
}
@ -2674,6 +3014,7 @@ gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
/* format_lost is reset in release() in baseaudiosink */
gst_element_post_message (element,
gst_message_new_clock_lost (GST_OBJECT_CAST (element),
GST_BASE_AUDIO_SINK (pulsesink)->provided_clock));

View file

@ -72,6 +72,13 @@ struct _GstPulseSink
GstStructure *properties;
pa_proplist *proplist;
#ifdef HAVE_PULSE_1_0
GMutex *sink_formats_lock;
GList *sink_formats;
volatile gint format_lost;
GstClockTime format_lost_time;
#endif
};
struct _GstPulseSinkClass

View file

@ -167,6 +167,14 @@ gst_pulse_fill_format_info (GstRingBufferSpec * spec, pa_format_info ** f,
} else if (spec->format == GST_S24_BE && spec->width == 32) {
format->encoding = PA_ENCODING_PCM;
sf = PA_SAMPLE_S24_32BE;
} else if (spec->format == GST_AC3) {
format->encoding = PA_ENCODING_AC3_IEC61937;
} else if (spec->format == GST_EAC3) {
format->encoding = PA_ENCODING_EAC3_IEC61937;
} else if (spec->format == GST_DTS) {
format->encoding = PA_ENCODING_DTS_IEC61937;
} else if (spec->format == GST_MPEG) {
format->encoding = PA_ENCODING_MPEG_IEC61937;
} else {
goto fail;
}