pulsesink: uncork fixes and use prebuf = 0

We can use prebuf = 0 to instruct pulse to not pause the stream on underflows.
This way we can remove the underflow callback. We however have to manually
uncork the stream now when we have no available space in the buffer or when we
are writing too far away from the current read_index.
This commit is contained in:
Wim Taymans 2009-04-09 17:18:54 +02:00
parent d849340e64
commit 8d58de128d

View file

@ -203,6 +203,7 @@ gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf,
pbuf->sample_spec.channels = 0;
#endif
pbuf->paused = FALSE;
pbuf->corked = TRUE;
}
@ -466,51 +467,6 @@ gst_pulsering_stream_state_cb (pa_stream * s, void *userdata)
}
}
/* we need to write empty samples to pulse so that it keeps on updating
* the clock correctly, we only start doing this on underflow */
static void
gst_pulsering_underflow_cb (pa_stream * s, void *userdata)
{
GstPulseSink *psink;
GstRingBuffer *rbuf;
GstPulseRingBuffer *pbuf;
size_t avail;
rbuf = GST_RING_BUFFER_CAST (userdata);
pbuf = GST_PULSERING_BUFFER_CAST (userdata);
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
GST_WARNING_OBJECT (psink, "got underflow");
if ((avail = pa_stream_writable_size (pbuf->stream)) > 0) {
guint segsize, towrite;
segsize = rbuf->spec.segsize;
/* we need to write empty data into the ringbuffer to make it advance the
* clock */
GST_LOG_OBJECT (psink, "writing %" G_GSIZE_FORMAT " bytes empty data",
avail);
while (avail > 0) {
towrite = MIN (avail, segsize);
if (pa_stream_write (pbuf->stream, rbuf->empty_seg, towrite,
NULL, 0, PA_SEEK_RELATIVE) < 0)
goto write_failed;
avail -= towrite;
}
}
return;
/* ERRORS */
write_failed:
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_write() failed: %s",
pa_strerror (pa_context_errno (pbuf->context))), (NULL));
return;
}
}
static void
gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
{
@ -589,15 +545,13 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
gst_pulsering_stream_state_cb, pbuf);
pa_stream_set_write_callback (pbuf->stream,
gst_pulsering_stream_request_cb, pbuf);
pa_stream_set_underflow_callback (pbuf->stream,
gst_pulsering_underflow_cb, pbuf);
/* buffering requirements */
/* buffering requirements. When setting prebuf to 0, the stream will not pause
* when we cause an underrun, which causes time to continue. */
memset (&buf_attr, 0, sizeof (buf_attr));
buf_attr.tlength = spec->segtotal * spec->segsize;
buf_attr.maxlength = buf_attr.tlength * 2;
//buf_attr.prebuf = buf_attr.tlength;
buf_attr.prebuf = spec->segsize;
buf_attr.prebuf = 0;
buf_attr.minreq = spec->segsize;
GST_INFO_OBJECT (psink, "tlength: %d", buf_attr.tlength);
@ -752,7 +706,8 @@ gst_pulsering_success_cb (pa_stream * s, int success, void *userdata)
/* update the corked state of a stream, must be called with the mainloop
* lock */
static gboolean
gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked)
gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked,
gboolean wait)
{
pa_operation *o = NULL;
GstPulseSink *psink;
@ -766,7 +721,7 @@ gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked)
gst_pulsering_success_cb, pbuf)))
goto cork_failed;
while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
pa_threaded_mainloop_wait (psink->mainloop);
if (gst_pulsering_is_dead (psink, pbuf))
goto server_dead;
@ -796,24 +751,22 @@ cork_failed:
}
}
/* start/resume playback ASAP */
/* start/resume playback ASAP, we don't uncork here but in the commit method */
static gboolean
gst_pulseringbuffer_start (GstRingBuffer * buf)
{
GstPulseSink *psink;
GstPulseRingBuffer *pbuf;
gboolean res;
pbuf = GST_PULSERING_BUFFER_CAST (buf);
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
GST_DEBUG_OBJECT (psink, "uncorking");
pa_threaded_mainloop_lock (psink->mainloop);
GST_DEBUG_OBJECT (psink, "starting");
pbuf->paused = FALSE;
res = gst_pulsering_set_corked (pbuf, FALSE);
pa_threaded_mainloop_unlock (psink->mainloop);
return res;
return TRUE;
}
/* pause/stop playback ASAP */
@ -827,11 +780,11 @@ gst_pulseringbuffer_pause (GstRingBuffer * buf)
pbuf = GST_PULSERING_BUFFER_CAST (buf);
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
GST_DEBUG_OBJECT (psink, "corking");
pa_threaded_mainloop_lock (psink->mainloop);
GST_DEBUG_OBJECT (psink, "pausing and corking");
/* make sure the commit method stops writing */
pbuf->paused = TRUE;
res = gst_pulsering_set_corked (pbuf, TRUE);
res = gst_pulsering_set_corked (pbuf, TRUE, FALSE);
if (pbuf->in_commit) {
/* we are waiting in a commit, signal */
GST_DEBUG_OBJECT (psink, "signal commit");
@ -856,6 +809,7 @@ gst_pulseringbuffer_stop (GstRingBuffer * buf)
pa_threaded_mainloop_lock (psink->mainloop);
pbuf->paused = TRUE;
res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
/* Inform anyone waiting in _commit() call that it shall wakeup */
if (pbuf->in_commit) {
GST_DEBUG_OBJECT (psink, "signal commit thread");
@ -978,6 +932,8 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
gboolean reverse;
gint *toprocess;
gint inr, outr;
gint64 offset;
guint bufsize;
pbuf = GST_PULSERING_BUFFER_CAST (buf);
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
@ -1003,6 +959,7 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
pbuf->in_commit = TRUE;
bps = buf->spec.bytes_per_sample;
bufsize = buf->spec.segsize * buf->spec.segtotal;
/* our toy resampler for trick modes */
reverse = out_samples < 0;
@ -1023,25 +980,50 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
if (pbuf->paused)
goto was_paused;
/* correct for sample offset against the internal clock */
offset = *sample;
if (pbuf->offset >= 0) {
if (offset > pbuf->offset)
offset -= pbuf->offset;
else
offset = 0;
} else {
if (offset > -pbuf->offset)
offset += pbuf->offset;
else
offset = 0;
}
offset = *sample * bps;
while (*toprocess > 0) {
size_t avail;
guint towrite;
gint64 offset;
GST_LOG_OBJECT (psink, "need to write %d samples", *toprocess);
GST_LOG_OBJECT (psink,
"need to write %d samples at offset %" G_GINT64_FORMAT, *toprocess,
offset);
for (;;) {
/* FIXME, this is not quite right */
if ((avail = pa_stream_writable_size (pbuf->stream)) == (size_t) - 1)
goto writable_size_failed;
/* We always try to satisfy a request for data */
GST_LOG_OBJECT (psink, "writable bytes %" G_GSIZE_FORMAT, avail);
/* convert to samples, we can only deal with multiples of the
* sample size */
avail /= bps;
/* We always try to satisfy a request for data */
GST_LOG_OBJECT (psink, "writable samples %" G_GSIZE_FORMAT, avail);
if (avail > 0)
break;
/* see if we need to uncork because we have no free space */
if (pbuf->corked) {
if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
goto uncork_failed;
}
/* we can't write a single byte, wait a bit */
GST_LOG_OBJECT (psink, "waiting for free space");
pa_threaded_mainloop_wait (psink->mainloop);
@ -1053,20 +1035,6 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
if (avail > out_samples)
avail = out_samples;
/* correct for sample offset against the internal clock */
offset = *sample;
if (pbuf->offset >= 0) {
if (offset > pbuf->offset)
offset -= pbuf->offset;
else
offset = 0;
} else {
if (offset > -pbuf->offset)
offset += pbuf->offset;
else
offset = 0;
}
offset *= bps;
towrite = avail * bps;
GST_LOG_OBJECT (psink, "writing %d samples at offset %" G_GUINT64_FORMAT,
@ -1114,6 +1082,23 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
avail = towrite / bps;
}
*sample += avail;
offset = *sample * bps;
/* check if we need to uncork after writing the samples */
if (pbuf->corked) {
const pa_timing_info *info = pa_stream_get_timing_info (pbuf->stream);
GST_LOG_OBJECT (psink,
"read_index at %" G_GUINT64_FORMAT ", offset %" G_GINT64_FORMAT,
info->read_index, offset);
/* we uncork when the read_index is too far behind the offset we need
* to write to. */
if (info->read_index + bufsize <= offset) {
if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
goto uncork_failed;
}
}
}
/* we consumed all samples here */
data = data_end + bps;
@ -1145,6 +1130,13 @@ start_failed:
GST_LOG_OBJECT (psink, "failed to start the ringbuffer");
return 0;
}
uncork_failed:
{
pbuf->in_commit = FALSE;
GST_ERROR_OBJECT (psink, "uncork failed");
pa_threaded_mainloop_unlock (psink->mainloop);
goto done;
}
was_paused:
{
pbuf->in_commit = FALSE;