pulsesrc: cleanups

Keep track of the paused state of the source and leave the read function when
paused.
don't wait for a latency update when the delay is not yet known but simply
return 0 instead of blocking.
Keep track of the corked state of the stream.
Fix the state changes.
This commit is contained in:
Wim Taymans 2009-07-28 18:29:07 +02:00
parent 19233e9671
commit 2d88251d9d
2 changed files with 178 additions and 106 deletions

View file

@ -271,7 +271,7 @@ gst_pulsesrc_init (GstPulseSrc * pulsesrc, GstPulseSrcClass * klass)
#endif
pulsesrc->operation_success = FALSE;
pulsesrc->did_reset = FALSE;
pulsesrc->paused = FALSE;
pulsesrc->in_read = FALSE;
pulsesrc->mainloop = pa_threaded_mainloop_new ();
@ -510,7 +510,10 @@ gst_pulsesrc_stream_request_cb (pa_stream * s, size_t length, void *userdata)
GST_LOG_OBJECT (pulsesrc, "got request for length %" G_GSIZE_FORMAT, length);
pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
if (pulsesrc->in_read) {
/* only signal when reading */
pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
}
}
static void
@ -527,8 +530,6 @@ gst_pulsesrc_stream_latency_update_cb (pa_stream * s, void *userdata)
GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt,
info->write_index, info->read_index_corrupt, info->read_index,
info->source_usec, info->configured_source_usec);
pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
}
static void
@ -598,14 +599,16 @@ gst_pulsesrc_open (GstAudioSrc * asrc)
g_free (name);
return TRUE;
/* ERRORS */
unlock_and_fail:
{
gst_pulsesrc_destroy_context (pulsesrc);
gst_pulsesrc_destroy_context (pulsesrc);
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
g_free (name);
return FALSE;
g_free (name);
return FALSE;
}
}
static gboolean
@ -643,43 +646,44 @@ gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data, guint length)
size_t sum = 0;
pa_threaded_mainloop_lock (pulsesrc->mainloop);
pulsesrc->in_read = TRUE;
if (pulsesrc->paused)
goto was_paused;
while (length > 0) {
size_t l;
GST_LOG_OBJECT (pulsesrc, "reading %u bytes", length);
/*check if we have a leftover buffer */
if (!pulsesrc->read_buffer) {
for (;;) {
if (gst_pulsesrc_is_dead (pulsesrc))
goto unlock_and_fail;
/* read all available data, we keep a pointer to the data and the length
* and take from it what we need. */
if (pa_stream_peek (pulsesrc->stream, &pulsesrc->read_buffer,
&pulsesrc->read_buffer_length) < 0) {
GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
("pa_stream_peek() failed: %s",
pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
goto unlock_and_fail;
}
&pulsesrc->read_buffer_length) < 0)
goto peek_failed;
GST_LOG_OBJECT (pulsesrc, "have data of %" G_GSIZE_FORMAT " bytes",
pulsesrc->read_buffer_length);
/* if we have data, process if */
if (pulsesrc->read_buffer)
if (pulsesrc->read_buffer && pulsesrc->read_buffer_length)
break;
if (pulsesrc->did_reset)
goto unlock_and_fail;
/* now wait for more data to become available */
GST_LOG_OBJECT (pulsesrc, "waiting for data");
pa_threaded_mainloop_wait (pulsesrc->mainloop);
if (pulsesrc->paused)
goto was_paused;
}
}
g_assert (pulsesrc->read_buffer && pulsesrc->read_buffer_length);
l = pulsesrc->read_buffer_length >
length ? length : pulsesrc->read_buffer_length;
@ -690,78 +694,90 @@ gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data, guint length)
data = (guint8 *) data + l;
length -= l;
sum += l;
if (pulsesrc->read_buffer_length <= 0) {
if (pa_stream_drop (pulsesrc->stream) < 0) {
GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
("pa_stream_drop() failed: %s",
pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
goto unlock_and_fail;
}
/* we copied all of the data, drop it now */
if (pa_stream_drop (pulsesrc->stream) < 0)
goto drop_failed;
/* reset pointer to data */
pulsesrc->read_buffer = NULL;
pulsesrc->read_buffer_length = 0;
}
}
pulsesrc->did_reset = FALSE;
pulsesrc->in_read = FALSE;
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
return sum;
/* ERRORS */
was_paused:
{
GST_LOG_OBJECT (pulsesrc, "we are paused");
goto unlock_and_fail;
}
peek_failed:
{
GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
("pa_stream_peek() failed: %s",
pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
goto unlock_and_fail;
}
drop_failed:
{
GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
("pa_stream_drop() failed: %s",
pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
goto unlock_and_fail;
}
unlock_and_fail:
{
pulsesrc->did_reset = FALSE;
pulsesrc->in_read = FALSE;
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
return (guint) - 1;
}
}
/* return the delay in samples */
static guint
gst_pulsesrc_delay (GstAudioSrc * asrc)
{
GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
pa_usec_t t;
int negative;
int negative, res;
guint result;
pa_threaded_mainloop_lock (pulsesrc->mainloop);
if (gst_pulsesrc_is_dead (pulsesrc))
goto server_dead;
for (;;) {
if (gst_pulsesrc_is_dead (pulsesrc))
goto unlock_and_fail;
/* get the latency, this can fail when we don't have a latency update yet.
* We don't want to wait for latency updates here but we just return 0. */
res = pa_stream_get_latency (pulsesrc->stream, &t, &negative);
if (pa_stream_get_latency (pulsesrc->stream, &t, &negative) >= 0)
break;
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
if (pa_context_errno (pulsesrc->context) != PA_ERR_NODATA) {
GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
("pa_stream_get_latency() failed: %s",
pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
goto unlock_and_fail;
}
pa_threaded_mainloop_wait (pulsesrc->mainloop);
if (res > 0) {
GST_DEBUG_OBJECT (pulsesrc, "could not get latency");
result = 0;
} else {
if (negative)
result = 0;
else
result = (guint) ((t * pulsesrc->sample_spec.rate) / 1000000LL);
}
return result;
if (negative)
t = 0;
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
return (guint) ((t * pulsesrc->sample_spec.rate) / 1000000LL);
unlock_and_fail:
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
return 0;
/* ERRORS */
server_dead:
{
GST_DEBUG_OBJECT (pulsesrc, "the server is dead");
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
return 0;
}
}
static gboolean
@ -955,6 +971,8 @@ gst_pulsesrc_prepare (GstAudioSrc * asrc, GstRingBufferSpec * spec)
goto unlock_and_fail;
}
pulsesrc->corked = TRUE;
for (;;) {
pa_stream_state_t state;
@ -995,11 +1013,12 @@ gst_pulsesrc_prepare (GstAudioSrc * asrc, GstRingBufferSpec * spec)
return TRUE;
unlock_and_fail:
{
gst_pulsesrc_destroy_stream (pulsesrc);
gst_pulsesrc_destroy_stream (pulsesrc);
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
return FALSE;
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
return FALSE;
}
}
static void
@ -1018,6 +1037,7 @@ gst_pulsesrc_reset (GstAudioSrc * asrc)
pa_operation *o = NULL;
pa_threaded_mainloop_lock (pulsesrc->mainloop);
GST_DEBUG_OBJECT (pulsesrc, "reset");
if (gst_pulsesrc_is_dead (pulsesrc))
goto unlock_and_fail;
@ -1031,9 +1051,9 @@ gst_pulsesrc_reset (GstAudioSrc * asrc)
goto unlock_and_fail;
}
pulsesrc->paused = TRUE;
/* Inform anyone waiting in _write() call that it shall wakeup */
if (pulsesrc->in_read) {
pulsesrc->did_reset = TRUE;
pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
}
@ -1062,77 +1082,127 @@ unlock_and_fail:
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
}
static void
gst_pulsesrc_pause (GstPulseSrc * pulsesrc, gboolean b)
/* update the corked state of a stream, must be called with the mainloop
* lock */
static gboolean
gst_pulsesrc_set_corked (GstPulseSrc * psrc, gboolean corked, gboolean wait)
{
pa_operation *o = NULL;
gboolean res = FALSE;
pa_threaded_mainloop_lock (pulsesrc->mainloop);
GST_DEBUG_OBJECT (psrc, "setting corked state to %d", corked);
if (psrc->corked != corked) {
if (!(o = pa_stream_cork (psrc->stream, corked,
gst_pulsesrc_success_cb, psrc)))
goto cork_failed;
if (gst_pulsesrc_is_dead (pulsesrc))
goto unlock;
if (!(o = pa_stream_cork (pulsesrc->stream, b, NULL, NULL))) {
GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
("pa_stream_cork() failed: %s",
pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
goto unlock;
while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
pa_threaded_mainloop_wait (psrc->mainloop);
if (gst_pulsesrc_is_dead (psrc))
goto server_dead;
}
psrc->corked = corked;
} else {
GST_DEBUG_OBJECT (psrc, "skipping, already in requested state");
}
res = TRUE;
while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
if (gst_pulsesrc_is_dead (pulsesrc))
goto unlock;
pa_threaded_mainloop_wait (pulsesrc->mainloop);
}
unlock:
cleanup:
if (o)
pa_operation_unref (o);
pa_threaded_mainloop_unlock (pulsesrc->mainloop);
return res;
/* ERRORS */
server_dead:
{
GST_DEBUG_OBJECT (psrc, "the server is dead");
goto cleanup;
}
cork_failed:
{
GST_ELEMENT_ERROR (psrc, RESOURCE, FAILED,
("pa_stream_cork() failed: %s",
pa_strerror (pa_context_errno (psrc->context))), (NULL));
goto cleanup;
}
}
/* start/resume playback ASAP */
static gboolean
gst_pulsesrc_play (GstPulseSrc * psrc)
{
pa_threaded_mainloop_lock (psrc->mainloop);
GST_DEBUG_OBJECT (psrc, "playing");
psrc->paused = FALSE;
gst_pulsesrc_set_corked (psrc, FALSE, FALSE);
pa_threaded_mainloop_unlock (psrc->mainloop);
return TRUE;
}
/* pause/stop playback ASAP */
static gboolean
gst_pulsesrc_pause (GstPulseSrc * psrc)
{
pa_threaded_mainloop_lock (psrc->mainloop);
GST_DEBUG_OBJECT (psrc, "pausing");
/* make sure the commit method stops writing */
psrc->paused = TRUE;
if (psrc->in_read) {
/* we are waiting in a read, signal */
GST_DEBUG_OBJECT (psrc, "signal read");
pa_threaded_mainloop_signal (psrc->mainloop, 0);
}
pa_threaded_mainloop_unlock (psrc->mainloop);
return TRUE;
}
static GstStateChangeReturn
gst_pulsesrc_change_state (GstElement * element, GstStateChange transition)
{
GstStateChangeReturn ret;
GstPulseSrc *this = GST_PULSESRC_CAST (element);
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
gst_pulsesrc_pause (this,
GST_STATE_TRANSITION_NEXT (transition) == GST_STATE_PAUSED);
break;
case GST_STATE_CHANGE_NULL_TO_READY:
if (!this->mixer)
this->mixer =
gst_pulsemixer_ctrl_new (G_OBJECT (this), this->server,
this->device, GST_PULSEMIXER_SOURCE);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
/* uncork and start recording */
gst_pulsesrc_play (this);
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
/* stop recording ASAP by corking */
pa_threaded_mainloop_lock (this->mainloop);
GST_DEBUG_OBJECT (this, "corking");
gst_pulsesrc_set_corked (this, TRUE, FALSE);
pa_threaded_mainloop_unlock (this->mainloop);
break;
default:
break;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
/* now make sure we get out of the _read method */
gst_pulsesrc_pause (this);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
if (this->mixer) {
gst_pulsemixer_ctrl_free (this->mixer);
this->mixer = NULL;
}
break;
default:
;
break;
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
return GST_STATE_CHANGE_SUCCESS;
return ret;
}

View file

@ -70,8 +70,10 @@ struct _GstPulseSrc
GstPulseMixerCtrl *mixer;
GstPulseProbe *probe;
gboolean corked;
gboolean operation_success;
gboolean did_reset, in_read;
gboolean paused;
gboolean in_read;
};
struct _GstPulseSrcClass