diff --git a/ext/pulse/pulsesink.c b/ext/pulse/pulsesink.c index be3da445fa..b5699150bb 100644 --- a/ext/pulse/pulsesink.c +++ b/ext/pulse/pulsesink.c @@ -203,6 +203,7 @@ gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf, pbuf->sample_spec.channels = 0; #endif + pbuf->paused = FALSE; pbuf->corked = TRUE; } @@ -466,51 +467,6 @@ gst_pulsering_stream_state_cb (pa_stream * s, void *userdata) } } -/* we need to write empty samples to pulse so that it keeps on updating - * the clock correctly, we only start doing this on underflow */ -static void -gst_pulsering_underflow_cb (pa_stream * s, void *userdata) -{ - GstPulseSink *psink; - GstRingBuffer *rbuf; - GstPulseRingBuffer *pbuf; - size_t avail; - - rbuf = GST_RING_BUFFER_CAST (userdata); - pbuf = GST_PULSERING_BUFFER_CAST (userdata); - psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - - GST_WARNING_OBJECT (psink, "got underflow"); - - if ((avail = pa_stream_writable_size (pbuf->stream)) > 0) { - guint segsize, towrite; - - segsize = rbuf->spec.segsize; - /* we need to write empty data into the ringbuffer to make it advance the - * clock */ - GST_LOG_OBJECT (psink, "writing %" G_GSIZE_FORMAT " bytes empty data", - avail); - - while (avail > 0) { - towrite = MIN (avail, segsize); - if (pa_stream_write (pbuf->stream, rbuf->empty_seg, towrite, - NULL, 0, PA_SEEK_RELATIVE) < 0) - goto write_failed; - avail -= towrite; - } - } - return; - - /* ERRORS */ -write_failed: - { - GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, - ("pa_stream_write() failed: %s", - pa_strerror (pa_context_errno (pbuf->context))), (NULL)); - return; - } -} - static void gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata) { @@ -589,15 +545,13 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) gst_pulsering_stream_state_cb, pbuf); pa_stream_set_write_callback (pbuf->stream, gst_pulsering_stream_request_cb, pbuf); - pa_stream_set_underflow_callback (pbuf->stream, - gst_pulsering_underflow_cb, pbuf); - /* buffering requirements */ + /* buffering requirements. When setting prebuf to 0, the stream will not pause + * when we cause an underrun, which causes time to continue. */ memset (&buf_attr, 0, sizeof (buf_attr)); buf_attr.tlength = spec->segtotal * spec->segsize; buf_attr.maxlength = buf_attr.tlength * 2; - //buf_attr.prebuf = buf_attr.tlength; - buf_attr.prebuf = spec->segsize; + buf_attr.prebuf = 0; buf_attr.minreq = spec->segsize; GST_INFO_OBJECT (psink, "tlength: %d", buf_attr.tlength); @@ -752,7 +706,8 @@ gst_pulsering_success_cb (pa_stream * s, int success, void *userdata) /* update the corked state of a stream, must be called with the mainloop * lock */ static gboolean -gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked) +gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked, + gboolean wait) { pa_operation *o = NULL; GstPulseSink *psink; @@ -766,7 +721,7 @@ gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked) gst_pulsering_success_cb, pbuf))) goto cork_failed; - while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { + while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) { pa_threaded_mainloop_wait (psink->mainloop); if (gst_pulsering_is_dead (psink, pbuf)) goto server_dead; @@ -796,24 +751,22 @@ cork_failed: } } -/* start/resume playback ASAP */ +/* start/resume playback ASAP, we don't uncork here but in the commit method */ static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf) { GstPulseSink *psink; GstPulseRingBuffer *pbuf; - gboolean res; pbuf = GST_PULSERING_BUFFER_CAST (buf); psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - GST_DEBUG_OBJECT (psink, "uncorking"); pa_threaded_mainloop_lock (psink->mainloop); + GST_DEBUG_OBJECT (psink, "starting"); pbuf->paused = FALSE; - res = gst_pulsering_set_corked (pbuf, FALSE); pa_threaded_mainloop_unlock (psink->mainloop); - return res; + return TRUE; } /* pause/stop playback ASAP */ @@ -827,11 +780,11 @@ gst_pulseringbuffer_pause (GstRingBuffer * buf) pbuf = GST_PULSERING_BUFFER_CAST (buf); psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - GST_DEBUG_OBJECT (psink, "corking"); pa_threaded_mainloop_lock (psink->mainloop); + GST_DEBUG_OBJECT (psink, "pausing and corking"); /* make sure the commit method stops writing */ pbuf->paused = TRUE; - res = gst_pulsering_set_corked (pbuf, TRUE); + res = gst_pulsering_set_corked (pbuf, TRUE, FALSE); if (pbuf->in_commit) { /* we are waiting in a commit, signal */ GST_DEBUG_OBJECT (psink, "signal commit"); @@ -856,6 +809,7 @@ gst_pulseringbuffer_stop (GstRingBuffer * buf) pa_threaded_mainloop_lock (psink->mainloop); pbuf->paused = TRUE; + res = gst_pulsering_set_corked (pbuf, TRUE, TRUE); /* Inform anyone waiting in _commit() call that it shall wakeup */ if (pbuf->in_commit) { GST_DEBUG_OBJECT (psink, "signal commit thread"); @@ -978,6 +932,8 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, gboolean reverse; gint *toprocess; gint inr, outr; + gint64 offset; + guint bufsize; pbuf = GST_PULSERING_BUFFER_CAST (buf); psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); @@ -1003,6 +959,7 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, pbuf->in_commit = TRUE; bps = buf->spec.bytes_per_sample; + bufsize = buf->spec.segsize * buf->spec.segtotal; /* our toy resampler for trick modes */ reverse = out_samples < 0; @@ -1023,25 +980,50 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, if (pbuf->paused) goto was_paused; + /* correct for sample offset against the internal clock */ + offset = *sample; + if (pbuf->offset >= 0) { + if (offset > pbuf->offset) + offset -= pbuf->offset; + else + offset = 0; + } else { + if (offset > -pbuf->offset) + offset += pbuf->offset; + else + offset = 0; + } + offset = *sample * bps; + while (*toprocess > 0) { size_t avail; guint towrite; - gint64 offset; - GST_LOG_OBJECT (psink, "need to write %d samples", *toprocess); + GST_LOG_OBJECT (psink, + "need to write %d samples at offset %" G_GINT64_FORMAT, *toprocess, + offset); + for (;;) { + /* FIXME, this is not quite right */ if ((avail = pa_stream_writable_size (pbuf->stream)) == (size_t) - 1) goto writable_size_failed; + /* We always try to satisfy a request for data */ + GST_LOG_OBJECT (psink, "writable bytes %" G_GSIZE_FORMAT, avail); + /* convert to samples, we can only deal with multiples of the * sample size */ avail /= bps; - /* We always try to satisfy a request for data */ - GST_LOG_OBJECT (psink, "writable samples %" G_GSIZE_FORMAT, avail); if (avail > 0) break; + /* see if we need to uncork because we have no free space */ + if (pbuf->corked) { + if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE)) + goto uncork_failed; + } + /* we can't write a single byte, wait a bit */ GST_LOG_OBJECT (psink, "waiting for free space"); pa_threaded_mainloop_wait (psink->mainloop); @@ -1053,20 +1035,6 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, if (avail > out_samples) avail = out_samples; - /* correct for sample offset against the internal clock */ - offset = *sample; - if (pbuf->offset >= 0) { - if (offset > pbuf->offset) - offset -= pbuf->offset; - else - offset = 0; - } else { - if (offset > -pbuf->offset) - offset += pbuf->offset; - else - offset = 0; - } - offset *= bps; towrite = avail * bps; GST_LOG_OBJECT (psink, "writing %d samples at offset %" G_GUINT64_FORMAT, @@ -1114,6 +1082,23 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, avail = towrite / bps; } *sample += avail; + offset = *sample * bps; + + /* check if we need to uncork after writing the samples */ + if (pbuf->corked) { + const pa_timing_info *info = pa_stream_get_timing_info (pbuf->stream); + + GST_LOG_OBJECT (psink, + "read_index at %" G_GUINT64_FORMAT ", offset %" G_GINT64_FORMAT, + info->read_index, offset); + + /* we uncork when the read_index is too far behind the offset we need + * to write to. */ + if (info->read_index + bufsize <= offset) { + if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE)) + goto uncork_failed; + } + } } /* we consumed all samples here */ data = data_end + bps; @@ -1145,6 +1130,13 @@ start_failed: GST_LOG_OBJECT (psink, "failed to start the ringbuffer"); return 0; } +uncork_failed: + { + pbuf->in_commit = FALSE; + GST_ERROR_OBJECT (psink, "uncork failed"); + pa_threaded_mainloop_unlock (psink->mainloop); + goto done; + } was_paused: { pbuf->in_commit = FALSE;