From 6bc6cafcc6086eb3072f4f1b4936393d9280fbb9 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 8 Apr 2009 13:52:41 +0200 Subject: [PATCH] pulsesink: rewrite pulsesink Derive from BaseAudioSink and implement our custom ringbuffer that maps to the internal pulseaudio ringbuffer. --- ext/pulse/pulsesink.c | 2045 +++++++++++++++++++++++++---------------- ext/pulse/pulsesink.h | 18 +- 2 files changed, 1281 insertions(+), 782 deletions(-) diff --git a/ext/pulse/pulsesink.c b/ext/pulse/pulsesink.c index 5064f6f407..6fcd33787c 100644 --- a/ext/pulse/pulsesink.c +++ b/ext/pulse/pulsesink.c @@ -1,7 +1,7 @@ -/* - * GStreamer pulseaudio plugin +/* GStreamer pulseaudio plugin * * Copyright (c) 2004-2008 Lennart Poettering + * (c) 2009 Wim Taymans * * gst-pulse is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as @@ -66,9 +66,1057 @@ enum PROP_VOLUME }; -static void gst_pulsesink_destroy_stream (GstPulseSink * pulsesink); +#define GST_TYPE_PULSERING_BUFFER \ + (gst_pulseringbuffer_get_type()) +#define GST_PULSERING_BUFFER(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PULSERING_BUFFER,GstPulseRingBuffer)) +#define GST_PULSERING_BUFFER_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PULSERING_BUFFER,GstPulseRingBufferClass)) +#define GST_PULSERING_BUFFER_GET_CLASS(obj) \ + (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PULSERING_BUFFER, GstPulseRingBufferClass)) +#define GST_PULSERING_BUFFER_CAST(obj) \ + ((GstPulseRingBuffer *)obj) +#define GST_IS_PULSERING_BUFFER(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSERING_BUFFER)) +#define GST_IS_PULSERING_BUFFER_CLASS(klass)\ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSERING_BUFFER)) -static void gst_pulsesink_destroy_context (GstPulseSink * pulsesink); +typedef struct _GstPulseRingBuffer GstPulseRingBuffer; +typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass; + +/* We keep a custom ringbuffer that is backed up by data allocated by + * pulseaudio. We must also overide the commit function to write into + * pulseaudio memory instead. */ +struct _GstPulseRingBuffer +{ + GstRingBuffer object; + + gchar *stream_name; + + pa_context *context; + pa_stream *stream; + + pa_sample_spec sample_spec; + + gboolean corked; + gboolean in_commit; + gboolean paused; + guint required; +}; + +struct _GstPulseRingBufferClass +{ + GstRingBufferClass parent_class; +}; + +static void gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass); +static void gst_pulseringbuffer_init (GstPulseRingBuffer * ringbuffer, + GstPulseRingBufferClass * klass); +static void gst_pulseringbuffer_finalize (GObject * object); + +static GstRingBufferClass *ring_parent_class = NULL; + +static gboolean gst_pulseringbuffer_open_device (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_close_device (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_acquire (GstRingBuffer * buf, + GstRingBufferSpec * spec); +static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_pause (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_stop (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_activate (GstRingBuffer * buf, + gboolean active); +static guint gst_pulseringbuffer_commit (GstRingBuffer * buf, + guint64 * sample, guchar * data, gint in_samples, gint out_samples, + gint * accum); + +/* ringbuffer abstract base class */ +static GType +gst_pulseringbuffer_get_type (void) +{ + static GType ringbuffer_type = 0; + + if (!ringbuffer_type) { + static const GTypeInfo ringbuffer_info = { + sizeof (GstPulseRingBufferClass), + NULL, + NULL, + (GClassInitFunc) gst_pulseringbuffer_class_init, + NULL, + NULL, + sizeof (GstPulseRingBuffer), + 0, + (GInstanceInitFunc) gst_pulseringbuffer_init, + NULL + }; + + ringbuffer_type = + g_type_register_static (GST_TYPE_RING_BUFFER, "GstPulseSinkRingBuffer", + &ringbuffer_info, 0); + } + return ringbuffer_type; +} + +static void +gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass) +{ + GObjectClass *gobject_class; + GstObjectClass *gstobject_class; + GstRingBufferClass *gstringbuffer_class; + + gobject_class = (GObjectClass *) klass; + gstobject_class = (GstObjectClass *) klass; + gstringbuffer_class = (GstRingBufferClass *) klass; + + ring_parent_class = g_type_class_peek_parent (klass); + + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_finalize); + + gstringbuffer_class->open_device = + GST_DEBUG_FUNCPTR (gst_pulseringbuffer_open_device); + gstringbuffer_class->close_device = + GST_DEBUG_FUNCPTR (gst_pulseringbuffer_close_device); + gstringbuffer_class->acquire = + GST_DEBUG_FUNCPTR (gst_pulseringbuffer_acquire); + gstringbuffer_class->release = + GST_DEBUG_FUNCPTR (gst_pulseringbuffer_release); + gstringbuffer_class->start = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start); + gstringbuffer_class->pause = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_pause); + gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start); + gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_stop); + + gstringbuffer_class->activate = + GST_DEBUG_FUNCPTR (gst_pulseringbuffer_activate); + gstringbuffer_class->commit = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_commit); +} + +static void +gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf, + GstPulseRingBufferClass * g_class) +{ + pbuf->stream_name = NULL; + pbuf->context = NULL; + pbuf->stream = NULL; + +#if HAVE_PULSE_0_9_13 + pa_sample_spec_init (&pbuf->sample_spec); +#else + pbuf->sample_spec.format = PA_SAMPLE_INVALID; + pbuf->sample_spec.rate = 0; + pbuf->sample_spec.channels = 0; +#endif + + pbuf->corked = TRUE; +} + +static void +gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf) +{ + if (pbuf->stream) { + pa_stream_disconnect (pbuf->stream); + + /* Make sure we don't get any further callbacks */ + pa_stream_set_state_callback (pbuf->stream, NULL, NULL); + pa_stream_set_write_callback (pbuf->stream, NULL, NULL); + pa_stream_set_latency_update_callback (pbuf->stream, NULL, NULL); + + pa_stream_unref (pbuf->stream); + pbuf->stream = NULL; + } + + g_free (pbuf->stream_name); + pbuf->stream_name = NULL; +} + +static void +gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf) +{ + gst_pulsering_destroy_stream (pbuf); + + if (pbuf->context) { + pa_context_disconnect (pbuf->context); + + /* Make sure we don't get any further callbacks */ + pa_context_set_state_callback (pbuf->context, NULL, NULL); + pa_context_set_subscribe_callback (pbuf->context, NULL, NULL); + + pa_context_unref (pbuf->context); + pbuf->context = NULL; + } +} + +static void +gst_pulseringbuffer_finalize (GObject * object) +{ + GstPulseRingBuffer *ringbuffer; + + ringbuffer = GST_PULSERING_BUFFER_CAST (object); + + gst_pulsering_destroy_context (ringbuffer); + + G_OBJECT_CLASS (ring_parent_class)->finalize (object); +} + +static gboolean +gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf) +{ + if (!pbuf->context + || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pbuf->context)) + || !pbuf->stream + || !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) { + const gchar *err_str = pbuf->context ? + pa_strerror (pa_context_errno (pbuf->context)) : NULL; + + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s", + err_str), (NULL)); + return TRUE; + } + return FALSE; +} + +static void +gst_pulsering_context_state_cb (pa_context * c, void *userdata) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + pa_context_state_t state; + + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + state = pa_context_get_state (c); + GST_LOG_OBJECT (psink, "got new context state %d", state); + + switch (state) { + case PA_CONTEXT_READY: + case PA_CONTEXT_TERMINATED: + case PA_CONTEXT_FAILED: + GST_LOG_OBJECT (psink, "signaling"); + pa_threaded_mainloop_signal (psink->mainloop, 0); + break; + + case PA_CONTEXT_UNCONNECTED: + case PA_CONTEXT_CONNECTING: + case PA_CONTEXT_AUTHORIZING: + case PA_CONTEXT_SETTING_NAME: + break; + } +} + +#if HAVE_PULSE_0_9_12 +static void +gst_pulsering_context_subscribe_cb (pa_context * c, + pa_subscription_event_type_t t, uint32_t idx, void *userdata) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) && + t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW)) + return; + + if (!psink->stream) + return; + + if (idx != pa_stream_get_index (pbuf->stream)) + return; + + /* Actually this event is also triggered when other properties of + * the stream change that are unrelated to the volume. However it is + * probably cheaper to signal the change here and check for the + * volume when the GObject property is read instead of querying it always. */ + + /* inform streaming thread to notify */ + g_atomic_int_compare_and_exchange (&psink->notify, 0, 1); +} +#endif + +/* will be called when the device should be opened. In this case we will connect + * to the server. We should not try to open any streams in this state. */ +static gboolean +gst_pulseringbuffer_open_device (GstRingBuffer * buf) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + gchar *name; + pa_mainloop_api *api; + + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); + pbuf = GST_PULSERING_BUFFER_CAST (buf); + + g_assert (!pbuf->context); + g_assert (!pbuf->stream); + + name = gst_pulse_client_name (); + + pa_threaded_mainloop_lock (psink->mainloop); + + /* get the mainloop api and create a context */ + GST_LOG_OBJECT (psink, "new context with name %s", GST_STR_NULL (name)); + api = pa_threaded_mainloop_get_api (psink->mainloop); + if (!(pbuf->context = pa_context_new (api, name))) + goto create_failed; + + /* register some essential callbacks */ + pa_context_set_state_callback (pbuf->context, + gst_pulsering_context_state_cb, pbuf); +#if HAVE_PULSE_0_9_12 + pa_context_set_subscribe_callback (psink->context, + gst_pulsering_context_subscribe_cb, pbuf); +#endif + + /* try to connect to the server and wait for completioni, we don't want to + * autospawn a deamon */ + GST_LOG_OBJECT (psink, "connect to server %s", GST_STR_NULL (psink->server)); + if (pa_context_connect (pbuf->context, psink->server, PA_CONTEXT_NOAUTOSPAWN, + NULL) < 0) + goto connect_failed; + + for (;;) { + pa_context_state_t state; + + state = pa_context_get_state (pbuf->context); + + GST_LOG_OBJECT (psink, "context state is now %d", state); + + if (!PA_CONTEXT_IS_GOOD (state)) + goto connect_failed; + + if (state == PA_CONTEXT_READY) + break; + + /* Wait until the context is ready */ + GST_LOG_OBJECT (psink, "waiting.."); + pa_threaded_mainloop_wait (psink->mainloop); + } + + GST_LOG_OBJECT (psink, "opened the device"); + + pa_threaded_mainloop_unlock (psink->mainloop); + g_free (name); + + return TRUE; + + /* ERRORS */ +unlock_and_fail: + { + gst_pulsering_destroy_context (pbuf); + + pa_threaded_mainloop_unlock (psink->mainloop); + g_free (name); + return FALSE; + } +create_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("Failed to create context"), (NULL)); + goto unlock_and_fail; + } +connect_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; + } +} + +/* close the device */ +static gboolean +gst_pulseringbuffer_close_device (GstRingBuffer * buf) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + + pbuf = GST_PULSERING_BUFFER_CAST (buf); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); + + GST_LOG_OBJECT (psink, "closing device"); + + pa_threaded_mainloop_lock (psink->mainloop); + gst_pulsering_destroy_context (pbuf); + pa_threaded_mainloop_unlock (psink->mainloop); + + GST_LOG_OBJECT (psink, "closed device"); + + return TRUE; +} + +static void +gst_pulsering_stream_state_cb (pa_stream * s, void *userdata) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + pa_stream_state_t state; + + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + state = pa_stream_get_state (s); + GST_LOG_OBJECT (psink, "got new stream state %d", state); + + switch (state) { + case PA_STREAM_READY: + case PA_STREAM_FAILED: + case PA_STREAM_TERMINATED: + GST_LOG_OBJECT (psink, "signaling"); + pa_threaded_mainloop_signal (psink->mainloop, 0); + break; + case PA_STREAM_UNCONNECTED: + case PA_STREAM_CREATING: + break; + } +} + +static void +gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length); + + if (pbuf->in_commit) { + pa_threaded_mainloop_signal (psink->mainloop, 0); + } +} + +static void +gst_pulsering_stream_latency_update_cb (pa_stream * s, void *userdata) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + GST_LOG_OBJECT (psink, "got latency update callback"); + + pa_threaded_mainloop_signal (psink->mainloop, 0); +} + +/* This method should create a new stream of the given @spec. No playback should + * start yet so we start in the corked state. */ +static gboolean +gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + pa_buffer_attr buf_attr; + const pa_buffer_attr *buf_attr_ptr; + pa_channel_map channel_map; + pa_operation *o = NULL; + pa_cvolume v, *pv; + pa_stream_flags_t flags; + const gchar *name; + + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); + pbuf = GST_PULSERING_BUFFER_CAST (buf); + + GST_LOG_OBJECT (psink, "creating sample spec"); + /* convert the gstreamer sample spec to the pulseaudio format */ + if (!gst_pulse_fill_sample_spec (spec, &pbuf->sample_spec)) + goto invalid_spec; + + pa_threaded_mainloop_lock (psink->mainloop); + + /* we need a context and a no stream */ + g_assert (pbuf->context); + g_assert (!pbuf->stream); + + /* enable event notifications */ + GST_LOG_OBJECT (psink, "subscribing to context events"); + if (!(o = pa_context_subscribe (pbuf->context, + PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL))) + goto subscribe_failed; + + pa_operation_unref (o); + + /* initialize the channel map */ + gst_pulse_gst_to_channel_map (&channel_map, spec); + + /* find a good name for the stream */ + if (psink->stream_name) + name = psink->stream_name; + else + name = "Playback Stream"; + + /* create a stream */ + GST_LOG_OBJECT (psink, "creating stream with name %s", name); + if (!(pbuf->stream = pa_stream_new (pbuf->context, + name, &pbuf->sample_spec, &channel_map))) + goto stream_failed; + + /* install essential callbacks */ + pa_stream_set_state_callback (pbuf->stream, + gst_pulsering_stream_state_cb, pbuf); + pa_stream_set_write_callback (pbuf->stream, + gst_pulsering_stream_request_cb, pbuf); + pa_stream_set_latency_update_callback (pbuf->stream, + gst_pulsering_stream_latency_update_cb, pbuf); + + /* buffering requirements */ + memset (&buf_attr, 0, sizeof (buf_attr)); + buf_attr.tlength = spec->segtotal * spec->segsize; + buf_attr.maxlength = buf_attr.tlength * 2; + //buf_attr.prebuf = buf_attr.tlength; + buf_attr.prebuf = spec->segsize; + buf_attr.minreq = spec->segsize; + + GST_INFO_OBJECT (psink, "tlength: %d", buf_attr.tlength); + GST_INFO_OBJECT (psink, "maxlength: %d", buf_attr.maxlength); + GST_INFO_OBJECT (psink, "prebuf: %d", buf_attr.prebuf); + GST_INFO_OBJECT (psink, "minreq: %d", buf_attr.minreq); + + /* configure volume when we changed it, else we leave the default */ + if (psink->volume_set) { + GST_LOG_OBJECT (psink, "have volume of %f", psink->volume); + pv = &v; + gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels, + psink->volume); + } else { + pv = NULL; + } + + /* construct the flags */ + flags = PA_STREAM_INTERPOLATE_TIMING | + PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_NOT_MONOTONOUS | +#if HAVE_PULSE_0_9_11 + PA_STREAM_ADJUST_LATENCY | +#endif + PA_STREAM_START_CORKED; + + /* we always start corked (see flags above) */ + pbuf->corked = TRUE; + + /* try to connect now */ + GST_LOG_OBJECT (psink, "connect for playback to device %s", + GST_STR_NULL (psink->device)); + if (pa_stream_connect_playback (pbuf->stream, psink->device, + &buf_attr, flags, pv, NULL) < 0) + goto connect_failed; + + for (;;) { + pa_stream_state_t state; + + state = pa_stream_get_state (pbuf->stream); + + GST_LOG_OBJECT (psink, "stream state is now %d", state); + + if (!PA_STREAM_IS_GOOD (state)) + goto connect_failed; + + if (state == PA_STREAM_READY) + break; + + /* Wait until the stream is ready */ + pa_threaded_mainloop_wait (psink->mainloop); + } + + GST_LOG_OBJECT (psink, "stream is acquired now"); + + /* get the actual buffering properties now */ + buf_attr_ptr = pa_stream_get_buffer_attr (pbuf->stream); + + GST_INFO_OBJECT (psink, "tlength: %d", buf_attr_ptr->tlength); + GST_INFO_OBJECT (psink, "maxlength: %d", buf_attr_ptr->maxlength); + GST_INFO_OBJECT (psink, "prebuf: %d", buf_attr_ptr->prebuf); + GST_INFO_OBJECT (psink, "minreq: %d", buf_attr_ptr->minreq); + + spec->segsize = buf_attr.minreq; + spec->segtotal = buf_attr.tlength / spec->segsize; + + pa_threaded_mainloop_unlock (psink->mainloop); + + return TRUE; + + /* ERRORS */ +unlock_and_fail: + { + gst_pulsering_destroy_stream (pbuf); + pa_threaded_mainloop_unlock (psink->mainloop); + + return FALSE; + } +invalid_spec: + { + GST_ELEMENT_ERROR (psink, RESOURCE, SETTINGS, + ("Invalid sample specification."), (NULL)); + return FALSE; + } +subscribe_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_context_subscribe() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; + } +stream_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("Failed to create stream: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; + } +connect_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("Failed to connect stream: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; + } +} + +/* free the stream that we acquired before */ +static gboolean +gst_pulseringbuffer_release (GstRingBuffer * buf) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); + pbuf = GST_PULSERING_BUFFER_CAST (buf); + + pa_threaded_mainloop_lock (psink->mainloop); + gst_pulsering_destroy_stream (pbuf); + pa_threaded_mainloop_unlock (psink->mainloop); + + return TRUE; +} + +/* this method should start the thread that starts pulling data. Usually only + * used in pull-based scheduling */ +static gboolean +gst_pulseringbuffer_activate (GstRingBuffer * buf, gboolean active) +{ + GstPulseSink *psink; + + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); + + return TRUE; +} + +/* update the corked state of a stream, must be called with the mainloop + * lock */ +static gboolean +gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked) +{ + pa_operation *o = NULL; + GstPulseSink *psink; + gboolean res = FALSE; + + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked); + if (pbuf->corked != corked) { + if (!(o = pa_stream_cork (pbuf->stream, corked, NULL, NULL))) + goto cork_failed; + + while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { + pa_threaded_mainloop_wait (psink->mainloop); + if (gst_pulsering_is_dead (psink, pbuf)) + goto server_dead; + } + pbuf->corked = corked; + } + res = TRUE; + +cleanup: + if (o) + pa_operation_unref (o); + + return res; + + /* ERRORS */ +server_dead: + { + GST_DEBUG_OBJECT (psink, "the server is dead"); + goto cleanup; + } +cork_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_cork() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto cleanup; + } +} + +/* start/resume playback ASAP */ +static gboolean +gst_pulseringbuffer_start (GstRingBuffer * buf) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + gboolean res; + + pbuf = GST_PULSERING_BUFFER_CAST (buf); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + GST_DEBUG_OBJECT (psink, "uncorking"); + pa_threaded_mainloop_lock (psink->mainloop); + pbuf->paused = FALSE; + res = gst_pulsering_set_corked (pbuf, FALSE); + pa_threaded_mainloop_unlock (psink->mainloop); + + return res; +} + +/* pause/stop playback ASAP */ +static gboolean +gst_pulseringbuffer_pause (GstRingBuffer * buf) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + gboolean res; + + pbuf = GST_PULSERING_BUFFER_CAST (buf); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + GST_DEBUG_OBJECT (psink, "corking"); + pa_threaded_mainloop_lock (psink->mainloop); + /* make sure the commit method stops writing */ + pbuf->paused = TRUE; + res = gst_pulsering_set_corked (pbuf, TRUE); + if (pbuf->in_commit) { + /* we are waiting in a commit, signal */ + GST_DEBUG_OBJECT (psink, "signal commit"); + pa_threaded_mainloop_signal (psink->mainloop, 0); + } + pa_threaded_mainloop_unlock (psink->mainloop); + + return res; +} + +static void +gst_pulsering_success_cb (pa_stream * s, int success, void *userdata) +{ + GstPulseRingBuffer *pbuf; + GstPulseSink *psink; + + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + pa_threaded_mainloop_signal (psink->mainloop, 0); +} + +/* stop playback, we flush everything. */ +static gboolean +gst_pulseringbuffer_stop (GstRingBuffer * buf) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + gboolean res = FALSE; + pa_operation *o = NULL; + + pbuf = GST_PULSERING_BUFFER_CAST (buf); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + pa_threaded_mainloop_lock (psink->mainloop); + pbuf->paused = TRUE; + /* Inform anyone waiting in _commit() call that it shall wakeup */ + if (pbuf->in_commit) { + GST_DEBUG_OBJECT (psink, "signal commit thread"); + pa_threaded_mainloop_signal (psink->mainloop, 0); + } + + /* then try to flush, it's not fatal when this fails */ + GST_DEBUG_OBJECT (psink, "flushing"); + if ((o = pa_stream_flush (pbuf->stream, gst_pulsering_success_cb, pbuf))) { + while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { + GST_DEBUG_OBJECT (psink, "wait for completion"); + pa_threaded_mainloop_wait (psink->mainloop); + if (gst_pulsering_is_dead (psink, pbuf)) + goto server_dead; + } + GST_DEBUG_OBJECT (psink, "flush completed"); + } + + res = TRUE; + +cleanup: + if (o) { + pa_operation_cancel (o); + pa_operation_unref (o); + } + pa_threaded_mainloop_unlock (psink->mainloop); + + return res; + + /* ERRORS */ +server_dead: + { + GST_DEBUG_OBJECT (psink, "the server is dead"); + goto cleanup; + } +} + +/* in_samples >= out_samples, rate > 1.0 */ +#define FWD_UP_SAMPLES(s,se,d,de) \ +G_STMT_START { \ + guint8 *sb = s, *db = d; \ + while (s <= se && d < de) { \ + memcpy (d, s, bps); \ + s += bps; \ + *accum += outr; \ + if ((*accum << 1) >= inr) { \ + *accum -= inr; \ + d += bps; \ + } \ + } \ + in_samples -= (s - sb)/bps; \ + out_samples -= (d - db)/bps; \ + GST_DEBUG ("fwd_up end %d/%d",*accum,*toprocess); \ +} G_STMT_END + +/* out_samples > in_samples, for rates smaller than 1.0 */ +#define FWD_DOWN_SAMPLES(s,se,d,de) \ +G_STMT_START { \ + guint8 *sb = s, *db = d; \ + while (s <= se && d < de) { \ + memcpy (d, s, bps); \ + d += bps; \ + *accum += inr; \ + if ((*accum << 1) >= outr) { \ + *accum -= outr; \ + s += bps; \ + } \ + } \ + in_samples -= (s - sb)/bps; \ + out_samples -= (d - db)/bps; \ + GST_DEBUG ("fwd_down end %d/%d",*accum,*toprocess); \ +} G_STMT_END + +#define REV_UP_SAMPLES(s,se,d,de) \ +G_STMT_START { \ + guint8 *sb = se, *db = d; \ + while (s <= se && d < de) { \ + memcpy (d, se, bps); \ + se -= bps; \ + *accum += outr; \ + while ((*accum << 1) >= inr) { \ + *accum -= inr; \ + d += bps; \ + } \ + } \ + in_samples -= (sb - se)/bps; \ + out_samples -= (d - db)/bps; \ + GST_DEBUG ("rev_up end %d/%d",*accum,*toprocess); \ +} G_STMT_END + +#define REV_DOWN_SAMPLES(s,se,d,de) \ +G_STMT_START { \ + guint8 *sb = se, *db = d; \ + while (s <= se && d < de) { \ + memcpy (d, se, bps); \ + d += bps; \ + *accum += inr; \ + while ((*accum << 1) >= outr) { \ + *accum -= outr; \ + se -= bps; \ + } \ + } \ + in_samples -= (sb - se)/bps; \ + out_samples -= (d - db)/bps; \ + GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess); \ +} G_STMT_END + + +/* our custom commit function because we write into the buffer of pulseaudio + * instead of keeping our own buffer */ +static guint +gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, + guchar * data, gint in_samples, gint out_samples, gint * accum) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + guint result; + guint bps; + guint8 *data_end; + gboolean reverse; + gint *toprocess; + gint inr, outr; + + pbuf = GST_PULSERING_BUFFER_CAST (buf); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + /* FIXME post message rather than using a signal (as mixer interface) */ + if (g_atomic_int_compare_and_exchange (&psink->notify, 1, 0)) + g_object_notify (G_OBJECT (psink), "volume"); + + /* make sure the ringbuffer is started */ + if (G_UNLIKELY (g_atomic_int_get (&buf->state) != + GST_RING_BUFFER_STATE_STARTED)) { + /* see if we are allowed to start it */ + if (G_UNLIKELY (g_atomic_int_get (&buf->abidata.ABI.may_start) == FALSE)) + goto no_start; + + GST_DEBUG_OBJECT (buf, "start!"); + if (!gst_ring_buffer_start (buf)) + goto start_failed; + } + + pa_threaded_mainloop_lock (psink->mainloop); + GST_DEBUG_OBJECT (psink, "entering commit"); + pbuf->in_commit = TRUE; + + bps = buf->spec.bytes_per_sample; + + /* our toy resampler for trick modes */ + reverse = out_samples < 0; + out_samples = ABS (out_samples); + + if (in_samples >= out_samples) + toprocess = &in_samples; + else + toprocess = &out_samples; + + inr = in_samples - 1; + outr = out_samples - 1; + + /* data_end points to the last sample we have to write, not past it. This is + * needed to properly handle reverse playback: it points to the last sample. */ + data_end = data + (bps * inr); + + if (pbuf->paused) + goto was_paused; + + while (*toprocess > 0) { + size_t avail; + guint towrite; + gint64 offset; + + GST_LOG_OBJECT (psink, "need to write %d samples", *toprocess); + for (;;) { + if ((avail = pa_stream_writable_size (pbuf->stream)) == (size_t) - 1) + goto writable_size_failed; + + /* convert to samples, we can only deal with multiples of the + * sample size */ + avail /= bps; + + /* We always try to satisfy a request for data */ + GST_LOG_OBJECT (psink, "writable samples %" G_GSIZE_FORMAT, avail); + if (avail > 0) + break; + + /* we can't write a single byte, wait a bit */ + GST_LOG_OBJECT (psink, "waiting for free space"); + pa_threaded_mainloop_wait (psink->mainloop); + + if (pbuf->paused) + goto was_paused; + } + + if (avail > out_samples) + avail = out_samples; + + GST_LOG_OBJECT (psink, "writing %d samples at offset %" G_GUINT64_FORMAT, + avail, *sample); + + offset = *sample * bps; + towrite = avail * bps; + + if (G_LIKELY (inr == outr && !reverse)) { + /* no rate conversion, simply write out the samples */ + if (pa_stream_write (pbuf->stream, data, towrite, NULL, offset, + PA_SEEK_ABSOLUTE) < 0) + goto write_failed; + + data += towrite; + in_samples -= avail; + out_samples -= avail; + } else { + guint8 *dest, *d, *d_end; + + /* we need to allocate a temporary buffer to resample the data into, + * FIXME, we should have a pulseaudio API to allocate this buffer for us + * from the shared memory. */ + dest = d = g_malloc (towrite); + d_end = d + towrite; + + if (!reverse) { + if (inr >= outr) + /* forward speed up */ + FWD_UP_SAMPLES (data, data_end, d, d_end); + else + /* forward slow down */ + FWD_DOWN_SAMPLES (data, data_end, d, d_end); + } else { + if (inr >= outr) + /* reverse speed up */ + REV_UP_SAMPLES (data, data_end, d, d_end); + else + /* reverse slow down */ + REV_DOWN_SAMPLES (data, data_end, d, d_end); + } + /* see what we have left to write */ + towrite = (d - dest); + if (pa_stream_write (pbuf->stream, dest, towrite, + g_free, offset, PA_SEEK_ABSOLUTE) < 0) + goto write_failed; + + avail = towrite / bps; + } + *sample += avail; + } + /* we consumed all samples here */ + data = data_end + bps; + + pbuf->in_commit = FALSE; + pa_threaded_mainloop_unlock (psink->mainloop); + +done: + result = inr - ((data_end - data) / bps); + GST_LOG_OBJECT (psink, "wrote %d samples", result); + + return result; + + /* ERRORS */ +unlock_and_fail: + { + pbuf->in_commit = FALSE; + GST_LOG_OBJECT (psink, "we are reset"); + pa_threaded_mainloop_unlock (psink->mainloop); + goto done; + } +no_start: + { + GST_LOG_OBJECT (psink, "we can not start"); + return 0; + } +start_failed: + { + GST_LOG_OBJECT (psink, "failed to start the ringbuffer"); + return 0; + } +was_paused: + { + pbuf->in_commit = FALSE; + GST_LOG_OBJECT (psink, "we are paused"); + pa_threaded_mainloop_unlock (psink->mainloop); + goto done; + } +writable_size_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_writable_size() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; + } +write_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_write() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; + } +} static void gst_pulsesink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -76,29 +1124,10 @@ static void gst_pulsesink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static void gst_pulsesink_finalize (GObject * object); -static gboolean gst_pulsesink_open (GstAudioSink * asink); - -static gboolean gst_pulsesink_close (GstAudioSink * asink); - -static gboolean gst_pulsesink_prepare (GstAudioSink * asink, - GstRingBufferSpec * spec); -static gboolean gst_pulsesink_unprepare (GstAudioSink * asink); - -static guint gst_pulsesink_write (GstAudioSink * asink, gpointer data, - guint length); -static guint gst_pulsesink_delay (GstAudioSink * asink); - -static void gst_pulsesink_reset (GstAudioSink * asink); - static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event); -static GstStateChangeReturn gst_pulsesink_change_state (GstElement * - element, GstStateChange transition); - static void gst_pulsesink_init_interfaces (GType type); -static gboolean gst_pulsesink_is_dead (GstPulseSink * pulsesink); - #if (G_BYTE_ORDER == G_LITTLE_ENDIAN) # define ENDIANNESS "LITTLE_ENDIAN, BIG_ENDIAN" #else @@ -106,14 +1135,14 @@ static gboolean gst_pulsesink_is_dead (GstPulseSink * pulsesink); #endif GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink); -GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstAudioSink, - GST_TYPE_AUDIO_SINK, gst_pulsesink_init_interfaces); +GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink, + GST_TYPE_BASE_AUDIO_SINK, gst_pulsesink_init_interfaces); static gboolean gst_pulsesink_interface_supported (GstImplementsInterface * iface, GType interface_type) { - GstPulseSink *this = GST_PULSESINK (iface); + GstPulseSink *this = GST_PULSESINK_CAST (iface); if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe) return TRUE; @@ -150,7 +1179,6 @@ gst_pulsesink_init_interfaces (GType type) static void gst_pulsesink_base_init (gpointer g_class) { - static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, @@ -195,30 +1223,33 @@ gst_pulsesink_base_init (gpointer g_class) gst_static_pad_template_get (&pad_template)); } +static GstRingBuffer * +gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink) +{ + GstRingBuffer *buffer; + + GST_DEBUG_OBJECT (sink, "creating ringbuffer"); + buffer = g_object_new (GST_TYPE_PULSERING_BUFFER, NULL); + GST_DEBUG_OBJECT (sink, "created ringbuffer @%p", buffer); + + return buffer; +} + static void gst_pulsesink_class_init (GstPulseSinkClass * klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass); - GstAudioSinkClass *gstaudiosink_class = GST_AUDIO_SINK_CLASS (klass); + GstBaseAudioSinkClass *gstaudiosink_class = GST_BASE_AUDIO_SINK_CLASS (klass); gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulsesink_finalize); gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_pulsesink_set_property); gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_pulsesink_get_property); - gstelement_class->change_state = - GST_DEBUG_FUNCPTR (gst_pulsesink_change_state); - gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event); - gstaudiosink_class->open = GST_DEBUG_FUNCPTR (gst_pulsesink_open); - gstaudiosink_class->close = GST_DEBUG_FUNCPTR (gst_pulsesink_close); - gstaudiosink_class->prepare = GST_DEBUG_FUNCPTR (gst_pulsesink_prepare); - gstaudiosink_class->unprepare = GST_DEBUG_FUNCPTR (gst_pulsesink_unprepare); - gstaudiosink_class->write = GST_DEBUG_FUNCPTR (gst_pulsesink_write); - gstaudiosink_class->delay = GST_DEBUG_FUNCPTR (gst_pulsesink_delay); - gstaudiosink_class->reset = GST_DEBUG_FUNCPTR (gst_pulsesink_reset); + gstaudiosink_class->create_ringbuffer = + GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer); /* Overwrite GObject fields */ g_object_class_install_property (gobject_class, @@ -244,91 +1275,66 @@ gst_pulsesink_class_init (GstPulseSinkClass * klass) #endif } +/* returns the current time of the sink ringbuffer */ +static GstClockTime +gst_pulse_sink_get_time (GstClock * clock, GstBaseAudioSink * sink) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + pa_usec_t time; + + if (sink->ringbuffer == NULL || sink->ringbuffer->spec.rate == 0) + return GST_CLOCK_TIME_NONE; + + pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + pa_threaded_mainloop_lock (psink->mainloop); + /* if we don't have enough data to get a timestamp, just return 0 */ + if (pa_stream_get_time (pbuf->stream, &time) < 0) + time = 0; + pa_threaded_mainloop_unlock (psink->mainloop); + + time *= 1000; + + GST_LOG_OBJECT (psink, "current time is %" GST_TIME_FORMAT, + GST_TIME_ARGS (time)); + + return time; +} + static void gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass) { - int e; - - pulsesink->server = pulsesink->device = pulsesink->stream_name = - pulsesink->device_description = NULL; - - pulsesink->context = NULL; - pulsesink->stream = NULL; + pulsesink->server = NULL; + pulsesink->device = NULL; + pulsesink->device_description = NULL; pulsesink->volume = 1.0; pulsesink->volume_set = FALSE; -#if HAVE_PULSE_0_9_13 - pa_sample_spec_init (&pulsesink->sample_spec); -#else - pulsesink->sample_spec.format = PA_SAMPLE_INVALID; - pulsesink->sample_spec.rate = 0; - pulsesink->sample_spec.channels = 0; -#endif - - pulsesink->operation_success = FALSE; - pulsesink->did_reset = FALSE; - pulsesink->in_write = FALSE; pulsesink->notify = 0; - pulsesink->mainloop = pa_threaded_mainloop_new (); - g_assert (pulsesink->mainloop); - - e = pa_threaded_mainloop_start (pulsesink->mainloop); - g_assert (e == 0); + g_assert ((pulsesink->mainloop = pa_threaded_mainloop_new ())); + g_assert (pa_threaded_mainloop_start (pulsesink->mainloop) == 0); pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink), G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device, TRUE, FALSE); /* TRUE for sinks, FALSE for sources */ -} -static void -gst_pulsesink_destroy_stream (GstPulseSink * pulsesink) -{ - if (pulsesink->stream) { - pa_stream_disconnect (pulsesink->stream); - - /* Make sure we don't get any further callbacks */ - pa_stream_set_state_callback (pulsesink->stream, NULL, NULL); - pa_stream_set_write_callback (pulsesink->stream, NULL, NULL); - pa_stream_set_latency_update_callback (pulsesink->stream, NULL, NULL); - - pa_stream_unref (pulsesink->stream); - pulsesink->stream = NULL; - } - - g_free (pulsesink->stream_name); - pulsesink->stream_name = NULL; - - g_free (pulsesink->device_description); - pulsesink->device_description = NULL; -} - -static void -gst_pulsesink_destroy_context (GstPulseSink * pulsesink) -{ - - gst_pulsesink_destroy_stream (pulsesink); - - if (pulsesink->context) { - pa_context_disconnect (pulsesink->context); - - /* Make sure we don't get any further callbacks */ - pa_context_set_state_callback (pulsesink->context, NULL, NULL); - pa_context_set_subscribe_callback (pulsesink->context, NULL, NULL); - - pa_context_unref (pulsesink->context); - pulsesink->context = NULL; - } + /* override with a custom clock */ + if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock) + gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock); + GST_BASE_AUDIO_SINK (pulsesink)->provided_clock = + gst_audio_clock_new ("GstPulseSinkClock", + (GstAudioClockGetTimeFunc) gst_pulse_sink_get_time, pulsesink); } static void gst_pulsesink_finalize (GObject * object) { - GstPulseSink *pulsesink = GST_PULSESINK (object); + GstPulseSink *pulsesink = GST_PULSESINK_CAST (object); pa_threaded_mainloop_stop (pulsesink->mainloop); - gst_pulsesink_destroy_context (pulsesink); - g_free (pulsesink->server); g_free (pulsesink->device); @@ -344,203 +1350,208 @@ gst_pulsesink_finalize (GObject * object) #if HAVE_PULSE_0_9_12 static void -gst_pulsesink_set_volume (GstPulseSink * pulsesink, gdouble volume) +gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume) { pa_cvolume v; pa_operation *o = NULL; + GstPulseRingBuffer *pbuf; - pa_threaded_mainloop_lock (pulsesink->mainloop); + pa_threaded_mainloop_lock (psink->mainloop); - pulsesink->volume = volume; - pulsesink->volume_set = TRUE; + psink->volume = volume; + psink->volume_set = TRUE; - if (!pulsesink->stream) + pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); + if (pbuf == NULL) goto unlock; - gst_pulse_cvolume_from_linear (&v, pulsesink->sample_spec.channels, volume); + gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume); - if (!(o = pa_context_set_sink_input_volume (pulsesink->context, - pa_stream_get_index (pulsesink->stream), &v, NULL, NULL))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_set_sink_input_volume() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; - } + if (!(o = pa_context_set_sink_input_volume (pbuf->context, + pa_stream_get_index (pbuf->stream), &v, NULL, NULL))) + goto volume_failed; /* We don't really care about the result of this call */ - unlock: if (o) pa_operation_unref (o); - pa_threaded_mainloop_unlock (pulsesink->mainloop); + pa_threaded_mainloop_unlock (psink->mainloop); + + return; + + /* ERRORS */ +volume_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_set_sink_input_volume() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock; + } } static void gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i, int eol, void *userdata) { - GstPulseSink *pulsesink = GST_PULSESINK (userdata); + GstPulseRingBuffer *pbuf; + GstPulseSink *psink; + + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); if (!i) return; - if (!pulsesink->stream) + if (!pbuf->stream) return; - g_assert (i->index == pa_stream_get_index (pulsesink->stream)); + g_assert (i->index == pa_stream_get_index (pbuf->stream)); - pulsesink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume)); + psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume)); } static gdouble -gst_pulsesink_get_volume (GstPulseSink * pulsesink) +gst_pulsesink_get_volume (GstPulseSink * psink) { + GstPulseRingBuffer *pbuf; pa_operation *o = NULL; gdouble v; - pa_threaded_mainloop_lock (pulsesink->mainloop); + pa_threaded_mainloop_lock (psink->mainloop); - if (!pulsesink->stream) - goto unlock; + pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); + if (pbuf == NULL || pbuf->stream == NULL) + goto no_buffer; - if (!(o = pa_context_get_sink_input_info (pulsesink->context, - pa_stream_get_index (pulsesink->stream), - gst_pulsesink_sink_input_info_cb, pulsesink))) { - - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_get_sink_input_info() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; - } + if (!(o = pa_context_get_sink_input_info (pbuf->context, + pa_stream_get_index (pbuf->stream), + gst_pulsesink_sink_input_info_cb, pbuf))) + goto info_failed; while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { - - if (gst_pulsesink_is_dead (pulsesink)) + pa_threaded_mainloop_wait (psink->mainloop); + if (gst_pulsering_is_dead (psink, pbuf)) goto unlock; - - pa_threaded_mainloop_wait (pulsesink->mainloop); } unlock: - if (o) pa_operation_unref (o); - v = pulsesink->volume; - - pa_threaded_mainloop_unlock (pulsesink->mainloop); + v = psink->volume; + pa_threaded_mainloop_unlock (psink->mainloop); return v; + + /* ERRORS */ +no_buffer: + { + GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); + goto unlock; + } +info_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_get_sink_input_info() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock; + } } #endif -static gboolean -gst_pulsesink_is_dead (GstPulseSink * pulsesink) -{ - - if (!pulsesink->context - || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pulsesink->context)) - || !pulsesink->stream - || !PA_STREAM_IS_GOOD (pa_stream_get_state (pulsesink->stream))) { - const gchar *err_str = pulsesink->context ? - pa_strerror (pa_context_errno (pulsesink->context)) : NULL; - - GST_ELEMENT_ERROR ((pulsesink), RESOURCE, FAILED, ("Disconnected: %s", - err_str), (NULL)); - return TRUE; - } - - return FALSE; -} - static void gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol, void *userdata) { - GstPulseSink *pulsesink = GST_PULSESINK (userdata); + GstPulseRingBuffer *pbuf; + GstPulseSink *psink; + + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); if (!i) return; - if (!pulsesink->stream) + if (!pbuf->stream) return; - g_assert (i->index == pa_stream_get_device_index (pulsesink->stream)); + g_assert (i->index == pa_stream_get_device_index (pbuf->stream)); - g_free (pulsesink->device_description); - pulsesink->device_description = g_strdup (i->description); + g_free (psink->device_description); + psink->device_description = g_strdup (i->description); } static gchar * -gst_pulsesink_device_description (GstPulseSink * pulsesink) +gst_pulsesink_device_description (GstPulseSink * psink) { + GstPulseRingBuffer *pbuf; pa_operation *o = NULL; gchar *t; - pa_threaded_mainloop_lock (pulsesink->mainloop); + pa_threaded_mainloop_lock (psink->mainloop); + pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); + if (pbuf == NULL || pbuf->stream == NULL) + goto no_buffer; - if (!pulsesink->stream) - goto unlock; - - if (!(o = pa_context_get_sink_info_by_index (pulsesink->context, - pa_stream_get_device_index (pulsesink->stream), - gst_pulsesink_sink_info_cb, pulsesink))) { - - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_get_sink_info() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; - } + if (!(o = pa_context_get_sink_info_by_index (pbuf->context, + pa_stream_get_device_index (pbuf->stream), + gst_pulsesink_sink_info_cb, pbuf))) + goto info_failed; while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { - - if (gst_pulsesink_is_dead (pulsesink)) + pa_threaded_mainloop_wait (psink->mainloop); + if (gst_pulsering_is_dead (psink, pbuf)) goto unlock; - - pa_threaded_mainloop_wait (pulsesink->mainloop); } unlock: - if (o) pa_operation_unref (o); - t = g_strdup (pulsesink->device_description); - - pa_threaded_mainloop_unlock (pulsesink->mainloop); + t = g_strdup (psink->device_description); + pa_threaded_mainloop_unlock (psink->mainloop); return t; + + /* ERRORS */ +no_buffer: + { + GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); + goto unlock; + } +info_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_get_sink_info() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock; + } } static void gst_pulsesink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { - GstPulseSink *pulsesink = GST_PULSESINK (object); + GstPulseSink *pulsesink = GST_PULSESINK_CAST (object); switch (prop_id) { case PROP_SERVER: g_free (pulsesink->server); pulsesink->server = g_value_dup_string (value); - if (pulsesink->probe) gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server); - break; - case PROP_DEVICE: g_free (pulsesink->device); pulsesink->device = g_value_dup_string (value); break; - #if HAVE_PULSE_0_9_12 case PROP_VOLUME: gst_pulsesink_set_volume (pulsesink, g_value_get_double (value)); break; #endif - default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -552,30 +1563,26 @@ gst_pulsesink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { - GstPulseSink *pulsesink = GST_PULSESINK (object); + GstPulseSink *pulsesink = GST_PULSESINK_CAST (object); switch (prop_id) { case PROP_SERVER: g_value_set_string (value, pulsesink->server); break; - case PROP_DEVICE: g_value_set_string (value, pulsesink->device); break; - case PROP_DEVICE_NAME:{ char *t = gst_pulsesink_device_description (pulsesink); g_value_set_string (value, t); g_free (t); break; } - #if HAVE_PULSE_0_9_12 case PROP_VOLUME: g_value_set_double (value, gst_pulsesink_get_volume (pulsesink)); break; #endif - default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -583,517 +1590,58 @@ gst_pulsesink_get_property (GObject * object, } static void -gst_pulsesink_context_state_cb (pa_context * c, void *userdata) -{ - GstPulseSink *pulsesink = GST_PULSESINK (userdata); - - switch (pa_context_get_state (c)) { - case PA_CONTEXT_READY: - case PA_CONTEXT_TERMINATED: - case PA_CONTEXT_FAILED: - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); - break; - - case PA_CONTEXT_UNCONNECTED: - case PA_CONTEXT_CONNECTING: - case PA_CONTEXT_AUTHORIZING: - case PA_CONTEXT_SETTING_NAME: - break; - } -} - -#if HAVE_PULSE_0_9_12 -static void -gst_pulsesink_context_subscribe_cb (pa_context * c, - pa_subscription_event_type_t t, uint32_t idx, void *userdata) -{ - GstPulseSink *pulsesink = GST_PULSESINK (userdata); - - if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) && - t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW)) - return; - - if (!pulsesink->stream) - return; - - if (idx != pa_stream_get_index (pulsesink->stream)) - return; - - /* Actually this event is also triggered when other properties of - * the stream change that are unrelated to the volume. However it is - * probably cheaper to signal the change here and check for the - * volume when the GObject property is read instead of querying it always. */ - - /* inform streaming thread to notify */ - g_atomic_int_compare_and_exchange (&pulsesink->notify, 0, 1); -} -#endif - -static void -gst_pulsesink_stream_state_cb (pa_stream * s, void *userdata) -{ - GstPulseSink *pulsesink = GST_PULSESINK (userdata); - - switch (pa_stream_get_state (s)) { - - case PA_STREAM_READY: - case PA_STREAM_FAILED: - case PA_STREAM_TERMINATED: - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); - break; - - case PA_STREAM_UNCONNECTED: - case PA_STREAM_CREATING: - break; - } -} - -static void -gst_pulsesink_stream_request_cb (pa_stream * s, size_t length, void *userdata) -{ - GstPulseSink *pulsesink = GST_PULSESINK (userdata); - - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); -} - -static void -gst_pulsesink_stream_latency_update_cb (pa_stream * s, void *userdata) -{ - GstPulseSink *pulsesink = GST_PULSESINK (userdata); - - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); -} - -static gboolean -gst_pulsesink_open (GstAudioSink * asink) -{ - GstPulseSink *pulsesink = GST_PULSESINK (asink); - gchar *name = gst_pulse_client_name (); - - pa_threaded_mainloop_lock (pulsesink->mainloop); - - g_assert (!pulsesink->context); - g_assert (!pulsesink->stream); - - if (!(pulsesink->context = - pa_context_new (pa_threaded_mainloop_get_api (pulsesink->mainloop), - name))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("Failed to create context"), (NULL)); - goto unlock_and_fail; - } - - pa_context_set_state_callback (pulsesink->context, - gst_pulsesink_context_state_cb, pulsesink); -#if HAVE_PULSE_0_9_12 - pa_context_set_subscribe_callback (pulsesink->context, - gst_pulsesink_context_subscribe_cb, pulsesink); -#endif - - if (pa_context_connect (pulsesink->context, pulsesink->server, 0, NULL) < 0) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Failed to connect: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } - - for (;;) { - pa_context_state_t state; - - state = pa_context_get_state (pulsesink->context); - - if (!PA_CONTEXT_IS_GOOD (state)) { - GST_DEBUG_OBJECT (pulsesink, "Context state was not READY. Got: %d", - state); - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Failed to connect: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } - - if (state == PA_CONTEXT_READY) - break; - - /* Wait until the context is ready */ - pa_threaded_mainloop_wait (pulsesink->mainloop); - } - - pa_threaded_mainloop_unlock (pulsesink->mainloop); - g_free (name); - return TRUE; - -unlock_and_fail: - - gst_pulsesink_destroy_context (pulsesink); - - pa_threaded_mainloop_unlock (pulsesink->mainloop); - g_free (name); - return FALSE; -} - -static gboolean -gst_pulsesink_close (GstAudioSink * asink) -{ - GstPulseSink *pulsesink = GST_PULSESINK (asink); - - pa_threaded_mainloop_lock (pulsesink->mainloop); - gst_pulsesink_destroy_context (pulsesink); - pa_threaded_mainloop_unlock (pulsesink->mainloop); - - return TRUE; -} - -static gboolean -gst_pulsesink_prepare (GstAudioSink * asink, GstRingBufferSpec * spec) -{ - pa_buffer_attr buf_attr; - pa_channel_map channel_map; - GstPulseSink *pulsesink = GST_PULSESINK (asink); - pa_operation *o = NULL; - pa_cvolume v; - - if (!gst_pulse_fill_sample_spec (spec, &pulsesink->sample_spec)) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, SETTINGS, - ("Invalid sample specification."), (NULL)); - return FALSE; - } - - pa_threaded_mainloop_lock (pulsesink->mainloop); - - if (!pulsesink->context) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Bad context"), (NULL)); - goto unlock_and_fail; - } - - g_assert (!pulsesink->stream); - - if (!(o = - pa_context_subscribe (pulsesink->context, - PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL))) { - const gchar *err_str = pulsesink->context ? - pa_strerror (pa_context_errno (pulsesink->context)) : NULL; - - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_context_subscribe() failed: %s", err_str), (NULL)); - goto unlock_and_fail; - } - - pa_operation_unref (o); - - if (!(pulsesink->stream = pa_stream_new (pulsesink->context, - pulsesink->stream_name ? - pulsesink->stream_name : "Playback Stream", - &pulsesink->sample_spec, - gst_pulse_gst_to_channel_map (&channel_map, spec)))) { - const gchar *err_str = pulsesink->context ? - pa_strerror (pa_context_errno (pulsesink->context)) : NULL; - - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("Failed to create stream: %s", err_str), (NULL)); - goto unlock_and_fail; - } - - pa_stream_set_state_callback (pulsesink->stream, - gst_pulsesink_stream_state_cb, pulsesink); - pa_stream_set_write_callback (pulsesink->stream, - gst_pulsesink_stream_request_cb, pulsesink); - pa_stream_set_latency_update_callback (pulsesink->stream, - gst_pulsesink_stream_latency_update_cb, pulsesink); - - memset (&buf_attr, 0, sizeof (buf_attr)); - buf_attr.tlength = spec->segtotal * spec->segsize; - buf_attr.maxlength = buf_attr.tlength * 2; - buf_attr.prebuf = buf_attr.tlength; - buf_attr.minreq = spec->segsize; - - if (pulsesink->volume_set) - gst_pulse_cvolume_from_linear (&v, pulsesink->sample_spec.channels, - pulsesink->volume); - - if (pa_stream_connect_playback (pulsesink->stream, pulsesink->device, - &buf_attr, - PA_STREAM_INTERPOLATE_TIMING | - PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_NOT_MONOTONOUS | -#if HAVE_PULSE_0_9_11 - PA_STREAM_ADJUST_LATENCY | -#endif - PA_STREAM_START_CORKED, pulsesink->volume_set ? &v : NULL, NULL) < 0) { - - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("Failed to connect stream: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } - pulsesink->corked = TRUE; - - for (;;) { - pa_stream_state_t state; - - state = pa_stream_get_state (pulsesink->stream); - - if (!PA_STREAM_IS_GOOD (state)) { - GST_DEBUG_OBJECT (pulsesink, "Stream state was not READY. Got: %d", - state); - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("Failed to connect stream: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } - - if (state == PA_STREAM_READY) - break; - - /* Wait until the stream is ready */ - pa_threaded_mainloop_wait (pulsesink->mainloop); - } - - pa_threaded_mainloop_unlock (pulsesink->mainloop); - return TRUE; - -unlock_and_fail: - - gst_pulsesink_destroy_stream (pulsesink); - - pa_threaded_mainloop_unlock (pulsesink->mainloop); - - return FALSE; -} - -static gboolean -gst_pulsesink_unprepare (GstAudioSink * asink) -{ - GstPulseSink *pulsesink = GST_PULSESINK (asink); - - pa_threaded_mainloop_lock (pulsesink->mainloop); - gst_pulsesink_destroy_stream (pulsesink); - pa_threaded_mainloop_unlock (pulsesink->mainloop); - - return TRUE; -} - -static guint -gst_pulsesink_write (GstAudioSink * asink, gpointer data, guint length) -{ - GstPulseSink *pulsesink = GST_PULSESINK (asink); - pa_operation *o = NULL; - size_t sum = 0; - - /* FIXME post message rather than using a signal (as mixer interface) */ - if (g_atomic_int_compare_and_exchange (&pulsesink->notify, 1, 0)) - g_object_notify (G_OBJECT (pulsesink), "volume"); - - pa_threaded_mainloop_lock (pulsesink->mainloop); - - pulsesink->in_write = TRUE; - - /* Make sure the stream is uncorked - it might not be on a caps change */ - if (pulsesink->corked) { - if (!(o = pa_stream_cork (pulsesink->stream, FALSE, NULL, NULL))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_cork() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } - - while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock_and_fail; - pa_threaded_mainloop_wait (pulsesink->mainloop); - } - pulsesink->corked = FALSE; - - pa_operation_unref (o); - o = NULL; - } - - while (length > 0) { - size_t l; - - for (;;) { - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock_and_fail; - - if ((l = pa_stream_writable_size (pulsesink->stream)) == (size_t) - 1) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_writable_size() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } - - if (l >= length) - break; - - if (pulsesink->did_reset) - goto unlock_and_fail; - - pa_threaded_mainloop_wait (pulsesink->mainloop); - } - - if (l > length) - l = length; - - if (pa_stream_write (pulsesink->stream, data, l, NULL, 0, - PA_SEEK_RELATIVE) < 0) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_write() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } - - data = (guint8 *) data + l; - length -= l; - - sum += l; - } - - pulsesink->did_reset = FALSE; - pulsesink->in_write = FALSE; - - pa_threaded_mainloop_unlock (pulsesink->mainloop); - return sum; - -unlock_and_fail: - - pulsesink->did_reset = FALSE; - pulsesink->in_write = FALSE; - - if (o) - pa_operation_unref (o); - - pa_threaded_mainloop_unlock (pulsesink->mainloop); - return (guint) - 1; -} - -static guint -gst_pulsesink_delay (GstAudioSink * asink) -{ - GstPulseSink *pulsesink = GST_PULSESINK (asink); - pa_usec_t t; - - pa_threaded_mainloop_lock (pulsesink->mainloop); - - for (;;) { - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock_and_fail; - - if (pa_stream_get_latency (pulsesink->stream, &t, NULL) >= 0) - break; - - if (pa_context_errno (pulsesink->context) != PA_ERR_NODATA) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_get_latency() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } - - pa_threaded_mainloop_wait (pulsesink->mainloop); - } - - pa_threaded_mainloop_unlock (pulsesink->mainloop); - - return gst_util_uint64_scale_int (t, pulsesink->sample_spec.rate, 1000000LL); - -unlock_and_fail: - - pa_threaded_mainloop_unlock (pulsesink->mainloop); - return 0; -} - -static void -gst_pulsesink_success_cb (pa_stream * s, int success, void *userdata) -{ - GstPulseSink *pulsesink = GST_PULSESINK (userdata); - - pulsesink->operation_success = !!success; - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); -} - -static void -gst_pulsesink_reset (GstAudioSink * asink) -{ - GstPulseSink *pulsesink = GST_PULSESINK (asink); - pa_operation *o = NULL; - - pa_threaded_mainloop_lock (pulsesink->mainloop); - - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock_and_fail; - - if (!(o = - pa_stream_flush (pulsesink->stream, gst_pulsesink_success_cb, - pulsesink))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_flush() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } - - /* Inform anyone waiting in _write() call that it shall wakeup */ - if (pulsesink->in_write) { - pulsesink->did_reset = TRUE; - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); - } - - pulsesink->operation_success = FALSE; - while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { - - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock_and_fail; - - pa_threaded_mainloop_wait (pulsesink->mainloop); - } - - if (!pulsesink->operation_success) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Flush failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } - -unlock_and_fail: - - if (o) { - pa_operation_cancel (o); - pa_operation_unref (o); - } - - pa_threaded_mainloop_unlock (pulsesink->mainloop); -} - -static void -gst_pulsesink_change_title (GstPulseSink * pulsesink, const gchar * t) +gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t) { pa_operation *o = NULL; + GstPulseRingBuffer *pbuf; - pa_threaded_mainloop_lock (pulsesink->mainloop); + pa_threaded_mainloop_lock (psink->mainloop); - g_free (pulsesink->stream_name); - pulsesink->stream_name = g_strdup (t); + pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); + if (pbuf == NULL) + goto no_buffer; - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock; + g_free (pbuf->stream_name); + pbuf->stream_name = g_strdup (t); - if (!(o = - pa_stream_set_name (pulsesink->stream, pulsesink->stream_name, NULL, - NULL))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_set_name() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; - } + if (gst_pulsering_is_dead (psink, pbuf)) + goto server_dead; + + if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL))) + goto name_failed; /* We're not interested if this operation failed or not */ - unlock: - if (o) pa_operation_unref (o); + pa_threaded_mainloop_unlock (psink->mainloop); - pa_threaded_mainloop_unlock (pulsesink->mainloop); + return; + + /* ERRORS */ +no_buffer: + { + GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); + goto unlock; + } +server_dead: + { + GST_DEBUG_OBJECT (psink, "the server is dead"); + goto unlock; + } +name_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_set_name() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock; + } } #if HAVE_PULSE_0_9_11 static void -gst_pulsesink_change_props (GstPulseSink * pulsesink, GstTagList * l) +gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l) { - static const gchar *const map[] = { GST_TAG_TITLE, PA_PROP_MEDIA_TITLE, GST_TAG_ARTIST, PA_PROP_MEDIA_ARTIST, @@ -1102,11 +1650,11 @@ gst_pulsesink_change_props (GstPulseSink * pulsesink, GstTagList * l) /* We might add more here later on ... */ NULL }; - pa_proplist *pl = NULL; const gchar *const *t; gboolean empty = TRUE; pa_operation *o = NULL; + GstPulseRingBuffer *pbuf; pl = pa_proplist_new (); @@ -1123,44 +1671,61 @@ gst_pulsesink_change_props (GstPulseSink * pulsesink, GstTagList * l) g_free (n); } } - if (empty) goto finish; - pa_threaded_mainloop_lock (pulsesink->mainloop); + pa_threaded_mainloop_lock (psink->mainloop); + pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); + if (pbuf == NULL) + goto no_buffer; - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock; + if (gst_pulsering_is_dead (psink, pbuf)) + goto server_dead; - if (!(o = - pa_stream_proplist_update (pulsesink->stream, PA_UPDATE_REPLACE, pl, - NULL, NULL))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_proplist_update() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; - } + if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE, + pl, NULL, NULL))) + goto update_failed; /* We're not interested if this operation failed or not */ - unlock: if (o) pa_operation_unref (o); - pa_threaded_mainloop_unlock (pulsesink->mainloop); + pa_threaded_mainloop_unlock (psink->mainloop); finish: if (pl) pa_proplist_free (pl); + + return; + + /* ERRORS */ +no_buffer: + { + GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); + goto unlock; + } +server_dead: + { + GST_DEBUG_OBJECT (psink, "the server is dead"); + goto unlock; + } +update_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_proplist_update() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock; + } } #endif static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event) { - GstPulseSink *pulsesink = GST_PULSESINK (sink); + GstPulseSink *pulsesink = GST_PULSESINK_CAST (sink); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_TAG:{ @@ -1207,61 +1772,3 @@ gst_pulsesink_event (GstBaseSink * sink, GstEvent * event) return GST_BASE_SINK_CLASS (parent_class)->event (sink, event); } - -static void -gst_pulsesink_pause (GstPulseSink * pulsesink, gboolean b) -{ - pa_operation *o = NULL; - - pa_threaded_mainloop_lock (pulsesink->mainloop); - - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock; - - if (!(o = pa_stream_cork (pulsesink->stream, b, NULL, NULL))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_cork() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; - } - - while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock; - pa_threaded_mainloop_wait (pulsesink->mainloop); - } - pulsesink->corked = b; - -unlock: - if (o) - pa_operation_unref (o); - - pa_threaded_mainloop_unlock (pulsesink->mainloop); -} - - -static GstStateChangeReturn -gst_pulsesink_change_state (GstElement * element, GstStateChange transition) -{ - GstStateChangeReturn res; - GstPulseSink *this = GST_PULSESINK (element); - - switch (transition) { - case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - gst_pulsesink_pause (this, FALSE); - break; - default: - break; - } - - res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - - switch (transition) { - case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - gst_pulsesink_pause (this, TRUE); - break; - default: - break; - } - return res; -} diff --git a/ext/pulse/pulsesink.h b/ext/pulse/pulsesink.h index 9ec626cd53..2f74ac3af4 100644 --- a/ext/pulse/pulsesink.h +++ b/ext/pulse/pulsesink.h @@ -42,39 +42,31 @@ G_BEGIN_DECLS (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSESINK)) #define GST_IS_PULSESINK_CLASS(obj) \ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSESINK)) +#define GST_PULSESINK_CAST(obj) \ + ((GstPulseSink *)(obj)) typedef struct _GstPulseSink GstPulseSink; typedef struct _GstPulseSinkClass GstPulseSinkClass; struct _GstPulseSink { - GstAudioSink sink; + GstBaseAudioSink sink; gchar *server, *device, *stream_name; + gchar *device_description; pa_threaded_mainloop *mainloop; - pa_context *context; - pa_stream *stream; - - pa_sample_spec sample_spec; - GstPulseProbe *probe; gdouble volume; gboolean volume_set; - - gchar *device_description; - - gboolean operation_success; - gboolean did_reset, in_write; - gboolean corked; gint notify; }; struct _GstPulseSinkClass { - GstAudioSinkClass parent_class; + GstBaseAudioSinkClass parent_class; }; GType gst_pulsesink_get_type (void);