/*
 *  GStreamer pulseaudio plugin
 *
 *  Copyright (c) 2004-2008 Lennart Poettering
 *
 *  gst-pulse is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU Lesser General Public License as
 *  published by the Free Software Foundation; either version 2.1 of the
 *  License, or (at your option) any later version.
 *
 *  gst-pulse is distributed in the hope that it will be useful, but
 *  WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 *  Lesser General Public License for more details.
 *
 *  You should have received a copy of the GNU Lesser General Public
 *  License along with gst-pulse; if not, write to the Free Software
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
 *  USA.
 */

/**
 * SECTION:element-pulsesrc
 * @title: pulsesrc
 * @see_also: pulsesink
 *
 * This element captures audio from a
 * [PulseAudio sound server](http://www.pulseaudio.org).
 *
 * ## Example pipelines
 * |[
 * gst-launch-1.0 -v pulsesrc ! audioconvert ! vorbisenc ! oggmux ! filesink location=alsasrc.ogg
 * ]| Record from a sound card using pulseaudio and encode to Ogg/Vorbis.
 *
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <string.h>
#include <stdio.h>

#include <gst/base/gstbasesrc.h>
#include <gst/gsttaglist.h>
#include <gst/audio/audio.h>

#include "gstpulseelements.h"
#include "pulsesrc.h"
#include "pulseutil.h"

GST_DEBUG_CATEGORY_EXTERN (pulse_debug);
#define GST_CAT_DEFAULT pulse_debug

#define DEFAULT_SERVER            NULL
#define DEFAULT_DEVICE            NULL
#define DEFAULT_CURRENT_DEVICE    NULL
#define DEFAULT_DEVICE_NAME       NULL

#define DEFAULT_VOLUME          1.0
#define DEFAULT_MUTE            FALSE
#define MAX_VOLUME              10.0

/* See the pulsesink code for notes on how we interact with the PA mainloop
 * thread. */

enum
{
  PROP_0,
  PROP_SERVER,
  PROP_DEVICE,
  PROP_DEVICE_NAME,
  PROP_CURRENT_DEVICE,
  PROP_CLIENT_NAME,
  PROP_STREAM_PROPERTIES,
  PROP_SOURCE_OUTPUT_INDEX,
  PROP_VOLUME,
  PROP_MUTE,
  PROP_LAST
};

static void gst_pulsesrc_destroy_stream (GstPulseSrc * pulsesrc);
static void gst_pulsesrc_destroy_context (GstPulseSrc * pulsesrc);

static void gst_pulsesrc_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec);
static void gst_pulsesrc_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec);
static void gst_pulsesrc_finalize (GObject * object);

static gboolean gst_pulsesrc_set_corked (GstPulseSrc * psrc, gboolean corked,
    gboolean wait);
static gboolean gst_pulsesrc_open (GstAudioSrc * asrc);

static gboolean gst_pulsesrc_close (GstAudioSrc * asrc);

static gboolean gst_pulsesrc_prepare (GstAudioSrc * asrc,
    GstAudioRingBufferSpec * spec);

static gboolean gst_pulsesrc_unprepare (GstAudioSrc * asrc);

static guint gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data,
    guint length, GstClockTime * timestamp);
static guint gst_pulsesrc_delay (GstAudioSrc * asrc);

static void gst_pulsesrc_reset (GstAudioSrc * src);

static gboolean gst_pulsesrc_negotiate (GstBaseSrc * basesrc);
static gboolean gst_pulsesrc_event (GstBaseSrc * basesrc, GstEvent * event);

static GstStateChangeReturn gst_pulsesrc_change_state (GstElement *
    element, GstStateChange transition);

static GstClockTime gst_pulsesrc_get_time (GstClock * clock, GstPulseSrc * src);

#define gst_pulsesrc_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstPulseSrc, gst_pulsesrc, GST_TYPE_AUDIO_SRC,
    G_IMPLEMENT_INTERFACE (GST_TYPE_STREAM_VOLUME, NULL));
GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (pulsesrc, "pulsesrc",
    GST_RANK_PRIMARY + 10, GST_TYPE_PULSESRC, pulse_element_init (plugin));

static void
gst_pulsesrc_class_init (GstPulseSrcClass * klass)
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GstAudioSrcClass *gstaudiosrc_class = GST_AUDIO_SRC_CLASS (klass);
  GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
  GstCaps *caps;
  gchar *clientname;

  gobject_class->finalize = gst_pulsesrc_finalize;
  gobject_class->set_property = gst_pulsesrc_set_property;
  gobject_class->get_property = gst_pulsesrc_get_property;

  gstelement_class->change_state =
      GST_DEBUG_FUNCPTR (gst_pulsesrc_change_state);

  gstbasesrc_class->event = GST_DEBUG_FUNCPTR (gst_pulsesrc_event);
  gstbasesrc_class->negotiate = GST_DEBUG_FUNCPTR (gst_pulsesrc_negotiate);

  gstaudiosrc_class->open = GST_DEBUG_FUNCPTR (gst_pulsesrc_open);
  gstaudiosrc_class->close = GST_DEBUG_FUNCPTR (gst_pulsesrc_close);
  gstaudiosrc_class->prepare = GST_DEBUG_FUNCPTR (gst_pulsesrc_prepare);
  gstaudiosrc_class->unprepare = GST_DEBUG_FUNCPTR (gst_pulsesrc_unprepare);
  gstaudiosrc_class->read = GST_DEBUG_FUNCPTR (gst_pulsesrc_read);
  gstaudiosrc_class->delay = GST_DEBUG_FUNCPTR (gst_pulsesrc_delay);
  gstaudiosrc_class->reset = GST_DEBUG_FUNCPTR (gst_pulsesrc_reset);

  /* Overwrite GObject fields */
  g_object_class_install_property (gobject_class,
      PROP_SERVER,
      g_param_spec_string ("server", "Server",
          "The PulseAudio server to connect to", DEFAULT_SERVER,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (gobject_class, PROP_DEVICE,
      g_param_spec_string ("device", "Device",
          "The PulseAudio source device to connect to", DEFAULT_DEVICE,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (gobject_class, PROP_CURRENT_DEVICE,
      g_param_spec_string ("current-device", "Current Device",
          "The current PulseAudio source device", DEFAULT_CURRENT_DEVICE,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (gobject_class,
      PROP_DEVICE_NAME,
      g_param_spec_string ("device-name", "Device name",
          "Human-readable name of the sound device", DEFAULT_DEVICE_NAME,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

  clientname = gst_pulse_client_name ();
  /**
   * GstPulseSrc:client-name
   *
   * The PulseAudio client name to use.
   */
  g_object_class_install_property (gobject_class,
      PROP_CLIENT_NAME,
      g_param_spec_string ("client-name", "Client Name",
          "The PulseAudio client_name_to_use", clientname,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
          GST_PARAM_MUTABLE_READY));
  g_free (clientname);

  /**
   * GstPulseSrc:stream-properties:
   *
   * List of pulseaudio stream properties. A list of defined properties can be
   * found in the [pulseaudio api docs](http://0pointer.de/lennart/projects/pulseaudio/doxygen/proplist_8h.html).
   *
   * Below is an example for registering as a music application to pulseaudio.
   * |[
   * GstStructure *props;
   *
   * props = gst_structure_from_string ("props,media.role=music", NULL);
   * g_object_set (pulse, "stream-properties", props, NULL);
   * gst_structure_free (props);
   * ]|
   */
  g_object_class_install_property (gobject_class,
      PROP_STREAM_PROPERTIES,
      g_param_spec_boxed ("stream-properties", "stream properties",
          "list of pulseaudio stream properties",
          GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  /**
   * GstPulseSrc:source-output-index:
   *
   * The index of the PulseAudio source output corresponding to this element.
   */
  g_object_class_install_property (gobject_class,
      PROP_SOURCE_OUTPUT_INDEX,
      g_param_spec_uint ("source-output-index", "source output index",
          "The index of the PulseAudio source output corresponding to this "
          "record stream", 0, G_MAXUINT, PA_INVALID_INDEX,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

  gst_element_class_set_static_metadata (gstelement_class,
      "PulseAudio Audio Source",
      "Source/Audio",
      "Captures audio from a PulseAudio server", "Lennart Poettering");

  caps = gst_pulse_fix_pcm_caps (gst_caps_from_string (_PULSE_CAPS_PCM));
  gst_element_class_add_pad_template (gstelement_class,
      gst_pad_template_new ("src", GST_PAD_SRC, GST_PAD_ALWAYS, caps));
  gst_caps_unref (caps);

  /**
   * GstPulseSrc:volume:
   *
   * The volume of the record stream.
   */
  g_object_class_install_property (gobject_class,
      PROP_VOLUME, g_param_spec_double ("volume", "Volume",
          "Linear volume of this stream, 1.0=100%",
          0.0, MAX_VOLUME, DEFAULT_VOLUME,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  /**
   * GstPulseSrc:mute:
   *
   * Whether the stream is muted or not.
   */
  g_object_class_install_property (gobject_class,
      PROP_MUTE, g_param_spec_boolean ("mute", "Mute",
          "Mute state of this stream",
          DEFAULT_MUTE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}

static void
gst_pulsesrc_init (GstPulseSrc * pulsesrc)
{
  pulsesrc->server = NULL;
  pulsesrc->device = NULL;
  pulsesrc->client_name = gst_pulse_client_name ();
  pulsesrc->device_description = NULL;

  pulsesrc->context = NULL;
  pulsesrc->stream = NULL;
  pulsesrc->stream_connected = FALSE;
  pulsesrc->source_output_idx = PA_INVALID_INDEX;

  pulsesrc->read_buffer = NULL;
  pulsesrc->read_buffer_length = 0;

  pa_sample_spec_init (&pulsesrc->sample_spec);

  pulsesrc->operation_success = FALSE;
  pulsesrc->paused = TRUE;
  pulsesrc->in_read = FALSE;

  pulsesrc->volume = DEFAULT_VOLUME;
  pulsesrc->volume_set = FALSE;

  pulsesrc->mute = DEFAULT_MUTE;
  pulsesrc->mute_set = FALSE;

  pulsesrc->notify = 0;

  pulsesrc->properties = NULL;
  pulsesrc->proplist = NULL;

  /* this should be the default but it isn't yet */
  gst_audio_base_src_set_slave_method (GST_AUDIO_BASE_SRC (pulsesrc),
      GST_AUDIO_BASE_SRC_SLAVE_SKEW);

  /* override with a custom clock */
  if (GST_AUDIO_BASE_SRC (pulsesrc)->clock)
    gst_object_unref (GST_AUDIO_BASE_SRC (pulsesrc)->clock);

  GST_AUDIO_BASE_SRC (pulsesrc)->clock =
      gst_audio_clock_new ("GstPulseSrcClock",
      (GstAudioClockGetTimeFunc) gst_pulsesrc_get_time, pulsesrc, NULL);
}

static void
gst_pulsesrc_destroy_stream (GstPulseSrc * pulsesrc)
{
  if (pulsesrc->stream) {
    pa_stream_disconnect (pulsesrc->stream);
    pa_stream_unref (pulsesrc->stream);
    pulsesrc->stream = NULL;
    pulsesrc->stream_connected = FALSE;
    pulsesrc->source_output_idx = PA_INVALID_INDEX;
    g_object_notify (G_OBJECT (pulsesrc), "source-output-index");
  }

  g_free (pulsesrc->device_description);
  pulsesrc->device_description = NULL;
}

static void
gst_pulsesrc_destroy_context (GstPulseSrc * pulsesrc)
{

  gst_pulsesrc_destroy_stream (pulsesrc);

  if (pulsesrc->context) {
    pa_context_disconnect (pulsesrc->context);

    /* Make sure we don't get any further callbacks */
    pa_context_set_state_callback (pulsesrc->context, NULL, NULL);
    pa_context_set_subscribe_callback (pulsesrc->context, NULL, NULL);

    pa_context_unref (pulsesrc->context);

    pulsesrc->context = NULL;
  }
}

static void
gst_pulsesrc_finalize (GObject * object)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (object);

  g_free (pulsesrc->server);
  g_free (pulsesrc->device);
  g_free (pulsesrc->client_name);
  g_free (pulsesrc->current_source_name);

  if (pulsesrc->properties)
    gst_structure_free (pulsesrc->properties);
  if (pulsesrc->proplist)
    pa_proplist_free (pulsesrc->proplist);

  G_OBJECT_CLASS (parent_class)->finalize (object);
}

#define CONTEXT_OK(c) ((c) && PA_CONTEXT_IS_GOOD (pa_context_get_state ((c))))
#define STREAM_OK(s) ((s) && PA_STREAM_IS_GOOD (pa_stream_get_state ((s))))

static gboolean
gst_pulsesrc_is_dead (GstPulseSrc * pulsesrc, gboolean check_stream)
{
  if (!pulsesrc->stream_connected)
    return TRUE;

  if (!CONTEXT_OK (pulsesrc->context))
    goto error;

  if (check_stream && !STREAM_OK (pulsesrc->stream))
    goto error;

  return FALSE;

error:
  {
    const gchar *err_str = pulsesrc->context ?
        pa_strerror (pa_context_errno (pulsesrc->context)) : NULL;
    GST_ELEMENT_ERROR ((pulsesrc), RESOURCE, FAILED, ("Disconnected: %s",
            err_str), (NULL));
    return TRUE;
  }
}

static void
gst_pulsesrc_source_info_cb (pa_context * c, const pa_source_info * i, int eol,
    void *userdata)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);

  if (!i)
    goto done;

  g_free (pulsesrc->device_description);
  pulsesrc->device_description = g_strdup (i->description);

done:
  pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
}

static gchar *
gst_pulsesrc_device_description (GstPulseSrc * pulsesrc)
{
  pa_operation *o = NULL;
  gchar *t;

  if (!pulsesrc->mainloop)
    goto no_mainloop;

  pa_threaded_mainloop_lock (pulsesrc->mainloop);

  if (!(o = pa_context_get_source_info_by_name (pulsesrc->context,
              pulsesrc->device, gst_pulsesrc_source_info_cb, pulsesrc))) {

    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("pa_stream_get_source_info() failed: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock;
  }

  while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {

    if (gst_pulsesrc_is_dead (pulsesrc, FALSE))
      goto unlock;

    pa_threaded_mainloop_wait (pulsesrc->mainloop);
  }

unlock:

  if (o)
    pa_operation_unref (o);

  t = g_strdup (pulsesrc->device_description);

  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  return t;

no_mainloop:
  {
    GST_DEBUG_OBJECT (pulsesrc, "have no mainloop");
    return NULL;
  }
}

static void
gst_pulsesrc_source_output_info_cb (pa_context * c,
    const pa_source_output_info * i, int eol, void *userdata)
{
  GstPulseSrc *psrc;

  psrc = GST_PULSESRC_CAST (userdata);

  if (!i)
    goto done;

  /* If the index doesn't match our current stream,
   * it implies we just recreated the stream (caps change)
   */
  if (i->index == psrc->source_output_idx) {
    psrc->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
    psrc->mute = i->mute;
    psrc->current_source_idx = i->source;

    if (G_UNLIKELY (psrc->volume > MAX_VOLUME)) {
      GST_WARNING_OBJECT (psrc, "Clipped volume from %f to %f",
          psrc->volume, MAX_VOLUME);
      psrc->volume = MAX_VOLUME;
    }
  }

done:
  pa_threaded_mainloop_signal (psrc->mainloop, 0);
}

static void
gst_pulsesrc_get_source_output_info (GstPulseSrc * pulsesrc, gdouble * volume,
    gboolean * mute)
{
  pa_operation *o = NULL;

  if (!pulsesrc->mainloop)
    goto no_mainloop;

  if (pulsesrc->source_output_idx == PA_INVALID_INDEX)
    goto no_index;

  pa_threaded_mainloop_lock (pulsesrc->mainloop);

  if (!(o = pa_context_get_source_output_info (pulsesrc->context,
              pulsesrc->source_output_idx, gst_pulsesrc_source_output_info_cb,
              pulsesrc)))
    goto info_failed;

  while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
    pa_threaded_mainloop_wait (pulsesrc->mainloop);
    if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
      goto unlock;
  }

unlock:

  if (volume)
    *volume = pulsesrc->volume;
  if (mute)
    *mute = pulsesrc->mute;

  if (o)
    pa_operation_unref (o);

  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  return;

  /* ERRORS */
no_mainloop:
  {
    GST_DEBUG_OBJECT (pulsesrc, "we have no mainloop");
    if (volume)
      *volume = pulsesrc->volume;
    if (mute)
      *mute = pulsesrc->mute;
    return;
  }
no_index:
  {
    GST_DEBUG_OBJECT (pulsesrc, "we don't have a stream index");
    if (volume)
      *volume = pulsesrc->volume;
    if (mute)
      *mute = pulsesrc->mute;
    return;
  }
info_failed:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("pa_context_get_source_output_info() failed: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock;
  }
}

static void
gst_pulsesrc_current_source_info_cb (pa_context * c, const pa_source_info * i,
    int eol, void *userdata)
{
  GstPulseSrc *psrc;

  psrc = GST_PULSESRC_CAST (userdata);

  if (!i)
    goto done;

  /* If the index doesn't match our current stream,
   * it implies we just recreated the stream (caps change)
   */
  if (i->index == psrc->current_source_idx) {
    g_free (psrc->current_source_name);
    psrc->current_source_name = g_strdup (i->name);
  }

done:
  pa_threaded_mainloop_signal (psrc->mainloop, 0);
}

static gchar *
gst_pulsesrc_get_current_device (GstPulseSrc * pulsesrc)
{
  pa_operation *o = NULL;
  gchar *current_src;

  if (!pulsesrc->mainloop)
    goto no_mainloop;

  if (pulsesrc->source_output_idx == PA_INVALID_INDEX)
    goto no_index;

  gst_pulsesrc_get_source_output_info (pulsesrc, NULL, NULL);

  pa_threaded_mainloop_lock (pulsesrc->mainloop);


  if (!(o = pa_context_get_source_info_by_index (pulsesrc->context,
              pulsesrc->current_source_idx, gst_pulsesrc_current_source_info_cb,
              pulsesrc)))
    goto info_failed;

  while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
    pa_threaded_mainloop_wait (pulsesrc->mainloop);
    if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
      goto unlock;
  }

unlock:

  current_src = g_strdup (pulsesrc->current_source_name);

  if (o)
    pa_operation_unref (o);

  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  return current_src;

  /* ERRORS */
no_mainloop:
  {
    GST_DEBUG_OBJECT (pulsesrc, "we have no mainloop");
    return NULL;
  }
no_index:
  {
    GST_DEBUG_OBJECT (pulsesrc, "we don't have a stream index");
    return NULL;
  }
info_failed:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("pa_context_get_source_output_info() failed: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock;
  }
}

static void
gst_pulsesrc_set_stream_volume (GstPulseSrc * pulsesrc, gdouble volume)
{
  pa_cvolume v;
  pa_operation *o = NULL;

  if (!pulsesrc->mainloop)
    goto no_mainloop;

  if (pulsesrc->source_output_idx == PA_INVALID_INDEX)
    goto no_index;

  pa_threaded_mainloop_lock (pulsesrc->mainloop);

  GST_DEBUG_OBJECT (pulsesrc, "setting volume to %f", volume);

  gst_pulse_cvolume_from_linear (&v, pulsesrc->sample_spec.channels, volume);

  if (!(o = pa_context_set_source_output_volume (pulsesrc->context,
              pulsesrc->source_output_idx, &v, NULL, NULL)))
    goto volume_failed;

  /* We don't really care about the result of this call */
unlock:

  if (o)
    pa_operation_unref (o);

  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  return;

  /* ERRORS */
no_mainloop:
  {
    pulsesrc->volume = volume;
    pulsesrc->volume_set = TRUE;
    GST_DEBUG_OBJECT (pulsesrc, "we have no mainloop");
    return;
  }
no_index:
  {
    pulsesrc->volume = volume;
    pulsesrc->volume_set = TRUE;
    GST_DEBUG_OBJECT (pulsesrc, "we don't have a stream index");
    return;
  }
volume_failed:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("pa_stream_set_source_output_volume() failed: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock;
  }
}

static void
gst_pulsesrc_set_stream_mute (GstPulseSrc * pulsesrc, gboolean mute)
{
  pa_operation *o = NULL;

  if (!pulsesrc->mainloop)
    goto no_mainloop;

  if (pulsesrc->source_output_idx == PA_INVALID_INDEX)
    goto no_index;

  pa_threaded_mainloop_lock (pulsesrc->mainloop);

  GST_DEBUG_OBJECT (pulsesrc, "setting mute state to %d", mute);

  if (!(o = pa_context_set_source_output_mute (pulsesrc->context,
              pulsesrc->source_output_idx, mute, NULL, NULL)))
    goto mute_failed;

  /* We don't really care about the result of this call */
unlock:

  if (o)
    pa_operation_unref (o);

  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  return;

  /* ERRORS */
no_mainloop:
  {
    pulsesrc->mute = mute;
    pulsesrc->mute_set = TRUE;
    GST_DEBUG_OBJECT (pulsesrc, "we have no mainloop");
    return;
  }
no_index:
  {
    pulsesrc->mute = mute;
    pulsesrc->mute_set = TRUE;
    GST_DEBUG_OBJECT (pulsesrc, "we don't have a stream index");
    return;
  }
mute_failed:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("pa_stream_set_source_output_mute() failed: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock;
  }
}

static void
gst_pulsesrc_set_stream_device (GstPulseSrc * pulsesrc, const gchar * device)
{
  pa_operation *o = NULL;

  if (!pulsesrc->mainloop)
    goto no_mainloop;

  if (pulsesrc->source_output_idx == PA_INVALID_INDEX)
    goto no_index;

  pa_threaded_mainloop_lock (pulsesrc->mainloop);

  GST_DEBUG_OBJECT (pulsesrc, "setting stream device to %s", device);

  if (!(o = pa_context_move_source_output_by_name (pulsesrc->context,
              pulsesrc->source_output_idx, device, NULL, NULL)))
    goto move_failed;

unlock:

  if (o)
    pa_operation_unref (o);

  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  return;

  /* ERRORS */
no_mainloop:
  {
    GST_DEBUG_OBJECT (pulsesrc, "we have no mainloop");
    return;
  }
no_index:
  {
    GST_DEBUG_OBJECT (pulsesrc, "we don't have a stream index");
    return;
  }
move_failed:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("pa_context_move_source_output_by_name(%s) failed: %s",
            device, pa_strerror (pa_context_errno (pulsesrc->context))),
        (NULL));
    goto unlock;
  }
}

static void
gst_pulsesrc_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec)
{

  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (object);

  switch (prop_id) {
    case PROP_SERVER:
      g_free (pulsesrc->server);
      pulsesrc->server = g_value_dup_string (value);
      break;
    case PROP_DEVICE:
      g_free (pulsesrc->device);
      pulsesrc->device = g_value_dup_string (value);
      gst_pulsesrc_set_stream_device (pulsesrc, pulsesrc->device);
      break;
    case PROP_CLIENT_NAME:
      g_free (pulsesrc->client_name);
      if (!g_value_get_string (value)) {
        GST_WARNING_OBJECT (pulsesrc,
            "Empty PulseAudio client name not allowed. Resetting to default value");
        pulsesrc->client_name = gst_pulse_client_name ();
      } else
        pulsesrc->client_name = g_value_dup_string (value);
      break;
    case PROP_STREAM_PROPERTIES:
      if (pulsesrc->properties)
        gst_structure_free (pulsesrc->properties);
      pulsesrc->properties =
          gst_structure_copy (gst_value_get_structure (value));
      if (pulsesrc->proplist)
        pa_proplist_free (pulsesrc->proplist);
      pulsesrc->proplist = gst_pulse_make_proplist (pulsesrc->properties);
      break;
    case PROP_VOLUME:
      gst_pulsesrc_set_stream_volume (pulsesrc, g_value_get_double (value));
      break;
    case PROP_MUTE:
      gst_pulsesrc_set_stream_mute (pulsesrc, g_value_get_boolean (value));
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_pulsesrc_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec)
{

  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (object);

  switch (prop_id) {
    case PROP_SERVER:
      g_value_set_string (value, pulsesrc->server);
      break;
    case PROP_DEVICE:
      g_value_set_string (value, pulsesrc->device);
      break;
    case PROP_CURRENT_DEVICE:
    {
      gchar *current_device = gst_pulsesrc_get_current_device (pulsesrc);
      if (current_device)
        g_value_take_string (value, current_device);
      else
        g_value_set_string (value, "");
      break;
    }
    case PROP_DEVICE_NAME:
      g_value_take_string (value, gst_pulsesrc_device_description (pulsesrc));
      break;
    case PROP_CLIENT_NAME:
      g_value_set_string (value, pulsesrc->client_name);
      break;
    case PROP_STREAM_PROPERTIES:
      gst_value_set_structure (value, pulsesrc->properties);
      break;
    case PROP_SOURCE_OUTPUT_INDEX:
      g_value_set_uint (value, pulsesrc->source_output_idx);
      break;
    case PROP_VOLUME:
    {
      gdouble volume;
      gst_pulsesrc_get_source_output_info (pulsesrc, &volume, NULL);
      g_value_set_double (value, volume);
      break;
    }
    case PROP_MUTE:
    {
      gboolean mute;
      gst_pulsesrc_get_source_output_info (pulsesrc, NULL, &mute);
      g_value_set_boolean (value, mute);
      break;
    }
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_pulsesrc_context_state_cb (pa_context * c, void *userdata)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);

  switch (pa_context_get_state (c)) {
    case PA_CONTEXT_READY:
    case PA_CONTEXT_TERMINATED:
    case PA_CONTEXT_FAILED:
      pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
      break;

    case PA_CONTEXT_UNCONNECTED:
    case PA_CONTEXT_CONNECTING:
    case PA_CONTEXT_AUTHORIZING:
    case PA_CONTEXT_SETTING_NAME:
      break;
  }
}

static void
gst_pulsesrc_stream_state_cb (pa_stream * s, void *userdata)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);

  switch (pa_stream_get_state (s)) {

    case PA_STREAM_READY:
    case PA_STREAM_FAILED:
    case PA_STREAM_TERMINATED:
      pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
      break;

    case PA_STREAM_UNCONNECTED:
    case PA_STREAM_CREATING:
      break;
  }
}

static void
gst_pulsesrc_stream_request_cb (pa_stream * s, size_t length, void *userdata)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);

  GST_LOG_OBJECT (pulsesrc, "got request for length %" G_GSIZE_FORMAT, length);

  if (pulsesrc->in_read) {
    /* only signal when reading */
    pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
  }
}

static void
gst_pulsesrc_stream_latency_update_cb (pa_stream * s, void *userdata)
{
  const pa_timing_info *info;
  pa_usec_t source_usec;

  info = pa_stream_get_timing_info (s);

  if (!info) {
    GST_LOG_OBJECT (GST_PULSESRC_CAST (userdata),
        "latency update (information unknown)");
    return;
  }
  source_usec = info->configured_source_usec;

  GST_LOG_OBJECT (GST_PULSESRC_CAST (userdata),
      "latency_update, %" G_GUINT64_FORMAT ", %d:%" G_GINT64_FORMAT ", %d:%"
      G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT,
      GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt,
      info->write_index, info->read_index_corrupt, info->read_index,
      info->source_usec, source_usec);
}

static void
gst_pulsesrc_stream_underflow_cb (pa_stream * s, void *userdata)
{
  GST_WARNING_OBJECT (GST_PULSESRC_CAST (userdata), "Got underflow");
}

static void
gst_pulsesrc_stream_overflow_cb (pa_stream * s, void *userdata)
{
  GST_WARNING_OBJECT (GST_PULSESRC_CAST (userdata), "Got overflow");
}

static void
gst_pulsesrc_context_subscribe_cb (pa_context * c,
    pa_subscription_event_type_t t, uint32_t idx, void *userdata)
{
  GstPulseSrc *psrc = GST_PULSESRC (userdata);

  if (t != (PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT | PA_SUBSCRIPTION_EVENT_CHANGE)
      && t != (PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT | PA_SUBSCRIPTION_EVENT_NEW))
    return;

  if (idx != psrc->source_output_idx)
    return;

  /* Actually this event is also triggered when other properties of the stream
   * change that are unrelated to the volume. However it is probably cheaper to
   * signal the change here and check for the volume when the GObject property
   * is read instead of querying it always. */

  /* inform streaming thread to notify */
  g_atomic_int_compare_and_exchange (&psrc->notify, 0, 1);
}

static gboolean
gst_pulsesrc_open (GstAudioSrc * asrc)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);

  pa_threaded_mainloop_lock (pulsesrc->mainloop);

  g_assert (!pulsesrc->context);
  g_assert (!pulsesrc->stream);

  GST_DEBUG_OBJECT (pulsesrc, "opening device");

  if (!(pulsesrc->context =
          pa_context_new (pa_threaded_mainloop_get_api (pulsesrc->mainloop),
              pulsesrc->client_name))) {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Failed to create context"),
        (NULL));
    goto unlock_and_fail;
  }

  pa_context_set_state_callback (pulsesrc->context,
      gst_pulsesrc_context_state_cb, pulsesrc);
  pa_context_set_subscribe_callback (pulsesrc->context,
      gst_pulsesrc_context_subscribe_cb, pulsesrc);

  GST_DEBUG_OBJECT (pulsesrc, "connect to server %s",
      GST_STR_NULL (pulsesrc->server));

  if (pa_context_connect (pulsesrc->context, pulsesrc->server, 0, NULL) < 0) {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Failed to connect: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock_and_fail;
  }

  for (;;) {
    pa_context_state_t state;

    state = pa_context_get_state (pulsesrc->context);

    if (!PA_CONTEXT_IS_GOOD (state)) {
      GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Failed to connect: %s",
              pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
      goto unlock_and_fail;
    }

    if (state == PA_CONTEXT_READY)
      break;

    /* Wait until the context is ready */
    pa_threaded_mainloop_wait (pulsesrc->mainloop);
  }
  GST_DEBUG_OBJECT (pulsesrc, "connected");

  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  return TRUE;

  /* ERRORS */
unlock_and_fail:
  {
    gst_pulsesrc_destroy_context (pulsesrc);

    pa_threaded_mainloop_unlock (pulsesrc->mainloop);

    return FALSE;
  }
}

static gboolean
gst_pulsesrc_close (GstAudioSrc * asrc)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);

  pa_threaded_mainloop_lock (pulsesrc->mainloop);
  gst_pulsesrc_destroy_context (pulsesrc);
  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  return TRUE;
}

static gboolean
gst_pulsesrc_unprepare (GstAudioSrc * asrc)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);

  pa_threaded_mainloop_lock (pulsesrc->mainloop);
  gst_pulsesrc_destroy_stream (pulsesrc);

  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  pulsesrc->read_buffer = NULL;
  pulsesrc->read_buffer_length = 0;

  return TRUE;
}

static guint
gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data, guint length,
    GstClockTime * timestamp)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
  size_t sum = 0;

  if (g_atomic_int_compare_and_exchange (&pulsesrc->notify, 1, 0)) {
    g_object_notify (G_OBJECT (pulsesrc), "volume");
    g_object_notify (G_OBJECT (pulsesrc), "mute");
    g_object_notify (G_OBJECT (pulsesrc), "current-device");
  }

  pa_threaded_mainloop_lock (pulsesrc->mainloop);
  pulsesrc->in_read = TRUE;

  if (!pulsesrc->stream_connected)
    goto not_connected;

  if (pulsesrc->paused)
    goto was_paused;

  while (length > 0) {
    size_t l;

    GST_LOG_OBJECT (pulsesrc, "reading %u bytes", length);

    /*check if we have a leftover buffer */
    if (!pulsesrc->read_buffer) {
      for (;;) {
        if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
          goto unlock_and_fail;

        /* read all available data, we keep a pointer to the data and the length
         * and take from it what we need. */
        if (pa_stream_peek (pulsesrc->stream, &pulsesrc->read_buffer,
                &pulsesrc->read_buffer_length) < 0)
          goto peek_failed;

        GST_LOG_OBJECT (pulsesrc, "have data of %" G_GSIZE_FORMAT " bytes",
            pulsesrc->read_buffer_length);

        /* if we have data, process if */
        if (pulsesrc->read_buffer && pulsesrc->read_buffer_length)
          break;

        /* now wait for more data to become available */
        GST_LOG_OBJECT (pulsesrc, "waiting for data");
        pa_threaded_mainloop_wait (pulsesrc->mainloop);

        if (pulsesrc->paused)
          goto was_paused;
      }
    }

    l = pulsesrc->read_buffer_length >
        length ? length : pulsesrc->read_buffer_length;

    memcpy (data, pulsesrc->read_buffer, l);

    pulsesrc->read_buffer = (const guint8 *) pulsesrc->read_buffer + l;
    pulsesrc->read_buffer_length -= l;

    data = (guint8 *) data + l;
    length -= l;
    sum += l;

    if (pulsesrc->read_buffer_length <= 0) {
      /* we copied all of the data, drop it now */
      if (pa_stream_drop (pulsesrc->stream) < 0)
        goto drop_failed;

      /* reset pointer to data */
      pulsesrc->read_buffer = NULL;
      pulsesrc->read_buffer_length = 0;
    }
  }

  pulsesrc->in_read = FALSE;
  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  return sum;

  /* ERRORS */
not_connected:
  {
    GST_LOG_OBJECT (pulsesrc, "we are not connected");
    goto unlock_and_fail;
  }
was_paused:
  {
    GST_LOG_OBJECT (pulsesrc, "we are paused");
    goto unlock_and_fail;
  }
peek_failed:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("pa_stream_peek() failed: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock_and_fail;
  }
drop_failed:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("pa_stream_drop() failed: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock_and_fail;
  }
unlock_and_fail:
  {
    pulsesrc->in_read = FALSE;
    pa_threaded_mainloop_unlock (pulsesrc->mainloop);

    return (guint) - 1;
  }
}

/* return the delay in samples */
static guint
gst_pulsesrc_delay (GstAudioSrc * asrc)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
  pa_usec_t t;
  int negative, res;
  guint result;

  pa_threaded_mainloop_lock (pulsesrc->mainloop);
  if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
    goto server_dead;

  /* get the latency, this can fail when we don't have a latency update yet.
   * We don't want to wait for latency updates here but we just return 0. */
  res = pa_stream_get_latency (pulsesrc->stream, &t, &negative);

  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  if (res < 0) {
    GST_DEBUG_OBJECT (pulsesrc, "could not get latency");
    result = 0;
  } else {
    if (negative)
      result = 0;
    else
      result = (guint) ((t * pulsesrc->sample_spec.rate) / 1000000LL);
  }
  return result;

  /* ERRORS */
server_dead:
  {
    GST_DEBUG_OBJECT (pulsesrc, "the server is dead");
    pa_threaded_mainloop_unlock (pulsesrc->mainloop);
    return 0;
  }
}

static gboolean
gst_pulsesrc_create_stream (GstPulseSrc * pulsesrc, GstCaps ** caps,
    GstAudioRingBufferSpec * rspec)
{
  pa_channel_map channel_map;
  const pa_channel_map *m;
  GstStructure *s;
  gboolean need_channel_layout = FALSE;
  GstAudioRingBufferSpec new_spec, *spec = NULL;
  const gchar *name;
  int i;

  /* If we already have a stream (renegotiation), free it first */
  if (pulsesrc->stream)
    gst_pulsesrc_destroy_stream (pulsesrc);

  if (rspec) {
    /* Post-negotiation, we already have a ringbuffer spec, so we just need to
     * use it to create a stream. */
    spec = rspec;

    /* At this point, we expect the channel-mask to be set in caps, so we just
     * use that */
    if (!gst_pulse_gst_to_channel_map (&channel_map, spec))
      goto invalid_spec;

  } else if (caps) {
    /* At negotiation time, we get a fixed caps and use it to set up a stream */
    s = gst_caps_get_structure (*caps, 0);
    gst_structure_get_int (s, "channels", &new_spec.info.channels);
    if (!gst_structure_has_field (s, "channel-mask")) {
      if (new_spec.info.channels == 1) {
        pa_channel_map_init_mono (&channel_map);
      } else if (new_spec.info.channels == 2) {
        pa_channel_map_init_stereo (&channel_map);
      } else {
        need_channel_layout = TRUE;
        gst_structure_set (s, "channel-mask", GST_TYPE_BITMASK,
            G_GUINT64_CONSTANT (0), NULL);
      }
    }

    memset (&new_spec, 0, sizeof (GstAudioRingBufferSpec));
    new_spec.latency_time = GST_SECOND;
    if (!gst_audio_ring_buffer_parse_caps (&new_spec, *caps))
      goto invalid_caps;

    /* Keep the refcount of the caps at 1 to make them writable */
    gst_caps_unref (new_spec.caps);

    if (!need_channel_layout
        && !gst_pulse_gst_to_channel_map (&channel_map, &new_spec)) {
      need_channel_layout = TRUE;
      gst_structure_set (s, "channel-mask", GST_TYPE_BITMASK,
          G_GUINT64_CONSTANT (0), NULL);
      for (i = 0; i < G_N_ELEMENTS (new_spec.info.position); i++)
        new_spec.info.position[i] = GST_AUDIO_CHANNEL_POSITION_INVALID;
    }

    spec = &new_spec;
  } else {
    /* !rspec && !caps */
    g_assert_not_reached ();
  }

  if (!gst_pulse_fill_sample_spec (spec, &pulsesrc->sample_spec))
    goto invalid_spec;

  pa_threaded_mainloop_lock (pulsesrc->mainloop);

  if (!pulsesrc->context)
    goto bad_context;

  name = "Record Stream";
  if (pulsesrc->proplist) {
    if (!(pulsesrc->stream = pa_stream_new_with_proplist (pulsesrc->context,
                name, &pulsesrc->sample_spec,
                (need_channel_layout) ? NULL : &channel_map,
                pulsesrc->proplist)))
      goto create_failed;

  } else if (!(pulsesrc->stream = pa_stream_new (pulsesrc->context,
              name, &pulsesrc->sample_spec,
              (need_channel_layout) ? NULL : &channel_map)))
    goto create_failed;

  if (caps) {
    m = pa_stream_get_channel_map (pulsesrc->stream);
    gst_pulse_channel_map_to_gst (m, &new_spec);
    gst_audio_channel_positions_to_valid_order (new_spec.info.position,
        new_spec.info.channels);
    gst_caps_unref (*caps);
    *caps = gst_audio_info_to_caps (&new_spec.info);

    GST_DEBUG_OBJECT (pulsesrc, "Caps are %" GST_PTR_FORMAT, *caps);
  }


  pa_stream_set_state_callback (pulsesrc->stream, gst_pulsesrc_stream_state_cb,
      pulsesrc);
  pa_stream_set_read_callback (pulsesrc->stream, gst_pulsesrc_stream_request_cb,
      pulsesrc);
  pa_stream_set_underflow_callback (pulsesrc->stream,
      gst_pulsesrc_stream_underflow_cb, pulsesrc);
  pa_stream_set_overflow_callback (pulsesrc->stream,
      gst_pulsesrc_stream_overflow_cb, pulsesrc);
  pa_stream_set_latency_update_callback (pulsesrc->stream,
      gst_pulsesrc_stream_latency_update_cb, pulsesrc);

  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  return TRUE;

  /* ERRORS */
invalid_caps:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, SETTINGS,
        ("Can't parse caps."), (NULL));
    goto fail;
  }
invalid_spec:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, SETTINGS,
        ("Invalid sample specification."), (NULL));
    goto fail;
  }
bad_context:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Bad context"), (NULL));
    goto unlock_and_fail;
  }
create_failed:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("Failed to create stream: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock_and_fail;
  }
unlock_and_fail:
  {
    gst_pulsesrc_destroy_stream (pulsesrc);

    pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  fail:
    return FALSE;
  }
}

static gboolean
gst_pulsesrc_event (GstBaseSrc * basesrc, GstEvent * event)
{
  GST_DEBUG_OBJECT (basesrc, "handle event %" GST_PTR_FORMAT, event);

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_RECONFIGURE:
      gst_pad_check_reconfigure (GST_BASE_SRC_PAD (basesrc));
      break;
    default:
      break;
  }
  return GST_BASE_SRC_CLASS (parent_class)->event (basesrc, event);
}

/* This is essentially gst_base_src_negotiate_default() but the caps
 * are guaranteed to have a channel layout for > 2 channels
 */
static gboolean
gst_pulsesrc_negotiate (GstBaseSrc * basesrc)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (basesrc);
  GstCaps *thiscaps;
  GstCaps *caps = NULL;
  GstCaps *peercaps = NULL;
  gboolean result = FALSE;

  /* first see what is possible on our source pad */
  thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
  GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
  /* nothing or anything is allowed, we're done */
  if (thiscaps == NULL || gst_caps_is_any (thiscaps))
    goto no_nego_needed;

  /* get the peer caps */
  peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
  GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
  if (peercaps) {
    /* get intersection */
    caps = gst_caps_intersect (thiscaps, peercaps);
    GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, caps);
    gst_caps_unref (thiscaps);
    gst_caps_unref (peercaps);
  } else {
    /* no peer, work with our own caps then */
    caps = thiscaps;
  }
  if (caps) {
    /* take first (and best, since they are sorted) possibility */
    caps = gst_caps_truncate (caps);

    /* now fixate */
    if (!gst_caps_is_empty (caps)) {
      caps = GST_BASE_SRC_CLASS (parent_class)->fixate (basesrc, caps);
      GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);

      if (gst_caps_is_any (caps)) {
        /* hmm, still anything, so element can do anything and
         * nego is not needed */
        result = TRUE;
      } else if (gst_caps_is_fixed (caps)) {
        /* yay, fixed caps, use those then */
        result = gst_pulsesrc_create_stream (pulsesrc, &caps, NULL);
        if (result)
          result = gst_base_src_set_caps (basesrc, caps);
      }
    }
    gst_caps_unref (caps);
  }
  return result;

no_nego_needed:
  {
    GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
    if (thiscaps)
      gst_caps_unref (thiscaps);
    return TRUE;
  }
}

static gboolean
gst_pulsesrc_prepare (GstAudioSrc * asrc, GstAudioRingBufferSpec * spec)
{
  pa_buffer_attr wanted;
  const pa_buffer_attr *actual;
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
  pa_stream_flags_t flags;
  pa_operation *o;
  GstAudioClock *clock;

  pa_threaded_mainloop_lock (pulsesrc->mainloop);

  if (!pulsesrc->stream)
    gst_pulsesrc_create_stream (pulsesrc, NULL, spec);

  {
    GstAudioRingBufferSpec s = *spec;
    const pa_channel_map *m;

    m = pa_stream_get_channel_map (pulsesrc->stream);
    gst_pulse_channel_map_to_gst (m, &s);
    gst_audio_ring_buffer_set_channel_positions (GST_AUDIO_BASE_SRC
        (pulsesrc)->ringbuffer, s.info.position);
  }

  /* enable event notifications */
  GST_LOG_OBJECT (pulsesrc, "subscribing to context events");
  if (!(o = pa_context_subscribe (pulsesrc->context,
              PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT, NULL, NULL))) {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("pa_context_subscribe() failed: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock_and_fail;
  }

  pa_operation_unref (o);

  /* There's a bit of a disconnect here between the audio ringbuffer and what
   * PulseAudio provides. The audio ringbuffer provide a total of buffer_time
   * worth of buffering, divided into segments of latency_time size. We're
   * asking PulseAudio to provide a total latency of latency_time, which, with
   * PA_STREAM_ADJUST_LATENCY, effectively sets itself up as a ringbuffer with
   * one segment being the hardware buffer, and the other the software buffer.
   * This segment size is returned as the fragsize.
   *
   * Since the two concepts don't map very well, what we do is keep segsize as
   * it is (unless fragsize is even larger, in which case we use that). We'll
   * get data from PulseAudio in smaller chunks than we want to pass on as an
   * element, and we coalesce those chunks in the ringbuffer memory and pass it
   * on in the expected chunk size. */
  wanted.maxlength = spec->segsize * spec->segtotal;
  wanted.tlength = -1;
  wanted.prebuf = 0;
  wanted.minreq = -1;
  wanted.fragsize = spec->segsize;

  GST_INFO_OBJECT (pulsesrc, "maxlength: %d", wanted.maxlength);
  GST_INFO_OBJECT (pulsesrc, "tlength:   %d", wanted.tlength);
  GST_INFO_OBJECT (pulsesrc, "prebuf:    %d", wanted.prebuf);
  GST_INFO_OBJECT (pulsesrc, "minreq:    %d", wanted.minreq);
  GST_INFO_OBJECT (pulsesrc, "fragsize:  %d", wanted.fragsize);

  flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
      PA_STREAM_NOT_MONOTONIC | PA_STREAM_ADJUST_LATENCY |
      PA_STREAM_START_CORKED;

  if (pa_stream_connect_record (pulsesrc->stream, pulsesrc->device, &wanted,
          flags) < 0) {
    goto connect_failed;
  }

  /* our clock will now start from 0 again */
  clock = GST_AUDIO_CLOCK (GST_AUDIO_BASE_SRC (pulsesrc)->clock);
  gst_audio_clock_reset (clock, 0);

  pulsesrc->corked = TRUE;

  for (;;) {
    pa_stream_state_t state;

    state = pa_stream_get_state (pulsesrc->stream);

    if (!PA_STREAM_IS_GOOD (state))
      goto stream_is_bad;

    if (state == PA_STREAM_READY)
      break;

    /* Wait until the stream is ready */
    pa_threaded_mainloop_wait (pulsesrc->mainloop);
  }
  pulsesrc->stream_connected = TRUE;

  /* store the source output index so it can be accessed via a property */
  pulsesrc->source_output_idx = pa_stream_get_index (pulsesrc->stream);
  g_object_notify (G_OBJECT (pulsesrc), "source-output-index");

  /* Although source output stream muting is supported, there is a bug in
   * PulseAudio that doesn't allow us to do this at startup, so we mute
   * manually post-connect. This should be moved back pre-connect once things
   * are fixed on the PulseAudio side. */
  if (pulsesrc->mute_set && pulsesrc->mute) {
    gst_pulsesrc_set_stream_mute (pulsesrc, pulsesrc->mute);
    pulsesrc->mute_set = FALSE;
  }

  if (pulsesrc->volume_set) {
    gst_pulsesrc_set_stream_volume (pulsesrc, pulsesrc->volume);
    pulsesrc->volume_set = FALSE;
  }

  /* get the actual buffering properties now */
  actual = pa_stream_get_buffer_attr (pulsesrc->stream);

  GST_INFO_OBJECT (pulsesrc, "maxlength: %d", actual->maxlength);
  GST_INFO_OBJECT (pulsesrc, "tlength:   %d (wanted: %d)",
      actual->tlength, wanted.tlength);
  GST_INFO_OBJECT (pulsesrc, "prebuf:    %d", actual->prebuf);
  GST_INFO_OBJECT (pulsesrc, "minreq:    %d (wanted %d)", actual->minreq,
      wanted.minreq);
  GST_INFO_OBJECT (pulsesrc, "fragsize:  %d (wanted %d)",
      actual->fragsize, wanted.fragsize);

  if (actual->fragsize >= spec->segsize) {
    spec->segsize = actual->fragsize;
  } else {
    /* fragsize is smaller than what we wanted, so let the read function
     * coalesce the smaller chunks as they come in */
  }

  /* Fix up the total ringbuffer size based on what we actually got */
  spec->segtotal = actual->maxlength / spec->segsize;
  /* Don't buffer less than 2 segments as the ringbuffer can't deal with it */
  if (spec->segtotal < 2)
    spec->segtotal = 2;

  if (!pulsesrc->paused) {
    GST_DEBUG_OBJECT (pulsesrc, "uncorking because we are playing");
    gst_pulsesrc_set_corked (pulsesrc, FALSE, FALSE);
  }
  pa_threaded_mainloop_unlock (pulsesrc->mainloop);

  return TRUE;

  /* ERRORS */
connect_failed:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("Failed to connect stream: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock_and_fail;
  }
stream_is_bad:
  {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("Failed to connect stream: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock_and_fail;
  }
unlock_and_fail:
  {
    gst_pulsesrc_destroy_stream (pulsesrc);

    pa_threaded_mainloop_unlock (pulsesrc->mainloop);
    return FALSE;
  }
}

static void
gst_pulsesrc_success_cb (pa_stream * s, int success, void *userdata)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);

  pulsesrc->operation_success = !!success;
  pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
}

static void
gst_pulsesrc_reset (GstAudioSrc * asrc)
{
  GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
  pa_operation *o = NULL;

  pa_threaded_mainloop_lock (pulsesrc->mainloop);
  GST_DEBUG_OBJECT (pulsesrc, "reset");

  if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
    goto unlock_and_fail;

  if (!(o =
          pa_stream_flush (pulsesrc->stream, gst_pulsesrc_success_cb,
              pulsesrc))) {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
        ("pa_stream_flush() failed: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock_and_fail;
  }

  pulsesrc->paused = TRUE;
  /* Inform anyone waiting in _write() call that it shall wakeup */
  if (pulsesrc->in_read) {
    pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
  }

  pulsesrc->operation_success = FALSE;
  while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {

    if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
      goto unlock_and_fail;

    pa_threaded_mainloop_wait (pulsesrc->mainloop);
  }

  if (!pulsesrc->operation_success) {
    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Flush failed: %s",
            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
    goto unlock_and_fail;
  }

unlock_and_fail:

  if (o) {
    pa_operation_cancel (o);
    pa_operation_unref (o);
  }

  pa_threaded_mainloop_unlock (pulsesrc->mainloop);
}

/* update the corked state of a stream, must be called with the mainloop
 * lock */
static gboolean
gst_pulsesrc_set_corked (GstPulseSrc * psrc, gboolean corked, gboolean wait)
{
  pa_operation *o = NULL;
  gboolean res = FALSE;

  GST_DEBUG_OBJECT (psrc, "setting corked state to %d", corked);
  if (!psrc->stream_connected)
    return TRUE;

  if (psrc->corked != corked) {
    if (!(o = pa_stream_cork (psrc->stream, corked,
                gst_pulsesrc_success_cb, psrc)))
      goto cork_failed;

    while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
      pa_threaded_mainloop_wait (psrc->mainloop);
      if (gst_pulsesrc_is_dead (psrc, TRUE))
        goto server_dead;
    }
    psrc->corked = corked;
  } else {
    GST_DEBUG_OBJECT (psrc, "skipping, already in requested state");
  }
  res = TRUE;

cleanup:
  if (o)
    pa_operation_unref (o);

  return res;

  /* ERRORS */
server_dead:
  {
    GST_DEBUG_OBJECT (psrc, "the server is dead");
    goto cleanup;
  }
cork_failed:
  {
    GST_ELEMENT_ERROR (psrc, RESOURCE, FAILED,
        ("pa_stream_cork() failed: %s",
            pa_strerror (pa_context_errno (psrc->context))), (NULL));
    goto cleanup;
  }
}

/* start/resume playback ASAP */
static gboolean
gst_pulsesrc_play (GstPulseSrc * psrc)
{
  pa_threaded_mainloop_lock (psrc->mainloop);
  GST_DEBUG_OBJECT (psrc, "playing");
  psrc->paused = FALSE;
  gst_pulsesrc_set_corked (psrc, FALSE, FALSE);
  pa_threaded_mainloop_unlock (psrc->mainloop);

  return TRUE;
}

/* pause/stop playback ASAP */
static gboolean
gst_pulsesrc_pause (GstPulseSrc * psrc)
{
  pa_threaded_mainloop_lock (psrc->mainloop);
  GST_DEBUG_OBJECT (psrc, "pausing");
  /* make sure the commit method stops writing */
  psrc->paused = TRUE;
  if (psrc->in_read) {
    /* we are waiting in a read, signal */
    GST_DEBUG_OBJECT (psrc, "signal read");
    pa_threaded_mainloop_signal (psrc->mainloop, 0);
  }
  pa_threaded_mainloop_unlock (psrc->mainloop);

  return TRUE;
}

static GstStateChangeReturn
gst_pulsesrc_change_state (GstElement * element, GstStateChange transition)
{
  GstStateChangeReturn ret;
  GstPulseSrc *this = GST_PULSESRC_CAST (element);

  switch (transition) {
    case GST_STATE_CHANGE_NULL_TO_READY:
      if (!(this->mainloop = pa_threaded_mainloop_new ()))
        goto mainloop_failed;
      if (pa_threaded_mainloop_start (this->mainloop) < 0) {
        pa_threaded_mainloop_free (this->mainloop);
        this->mainloop = NULL;
        goto mainloop_start_failed;
      }
      break;
    case GST_STATE_CHANGE_READY_TO_PAUSED:
      gst_element_post_message (element,
          gst_message_new_clock_provide (GST_OBJECT_CAST (element),
              GST_AUDIO_BASE_SRC (this)->clock, TRUE));
      break;
    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
      /* uncork and start recording */
      gst_pulsesrc_play (this);
      break;
    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
      /* stop recording ASAP by corking */
      pa_threaded_mainloop_lock (this->mainloop);
      GST_DEBUG_OBJECT (this, "corking");
      gst_pulsesrc_set_corked (this, TRUE, FALSE);
      pa_threaded_mainloop_unlock (this->mainloop);
      break;
    default:
      break;
  }

  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);

  switch (transition) {
    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
      /* now make sure we get out of the _read method */
      gst_pulsesrc_pause (this);
      break;
    case GST_STATE_CHANGE_READY_TO_NULL:
      if (this->mainloop)
        pa_threaded_mainloop_stop (this->mainloop);

      gst_pulsesrc_destroy_context (this);

      if (this->mainloop) {
        pa_threaded_mainloop_free (this->mainloop);
        this->mainloop = NULL;
      }
      break;
    case GST_STATE_CHANGE_PAUSED_TO_READY:
      /* format_lost is reset in release() in baseaudiosink */
      gst_element_post_message (element,
          gst_message_new_clock_lost (GST_OBJECT_CAST (element),
              GST_AUDIO_BASE_SRC (this)->clock));
      break;
    default:
      break;
  }

  return ret;

  /* ERRORS */
mainloop_failed:
  {
    GST_ELEMENT_ERROR (this, RESOURCE, FAILED,
        ("pa_threaded_mainloop_new() failed"), (NULL));
    return GST_STATE_CHANGE_FAILURE;
  }
mainloop_start_failed:
  {
    GST_ELEMENT_ERROR (this, RESOURCE, FAILED,
        ("pa_threaded_mainloop_start() failed"), (NULL));
    return GST_STATE_CHANGE_FAILURE;
  }
}

static GstClockTime
gst_pulsesrc_get_time (GstClock * clock, GstPulseSrc * src)
{
  pa_usec_t time = 0;

  if (src->mainloop == NULL)
    goto out;

  pa_threaded_mainloop_lock (src->mainloop);
  if (!src->stream)
    goto unlock_and_out;

  if (gst_pulsesrc_is_dead (src, TRUE))
    goto unlock_and_out;

  if (pa_stream_get_time (src->stream, &time) < 0) {
    GST_DEBUG_OBJECT (src, "could not get time");
    time = GST_CLOCK_TIME_NONE;
  } else {
    time *= 1000;
  }


unlock_and_out:
  pa_threaded_mainloop_unlock (src->mainloop);

out:
  return time;
}