From 8855ed90c0bc0361bf4e7386b4a6c28dd108d403 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 9 Apr 2009 12:13:44 +0200 Subject: [PATCH] pulsesink: add beginnings of pull-based scheduling --- ext/pulse/pulsesink.c | 135 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 116 insertions(+), 19 deletions(-) diff --git a/ext/pulse/pulsesink.c b/ext/pulse/pulsesink.c index 7e4f0014f4..75e350443d 100644 --- a/ext/pulse/pulsesink.c +++ b/ext/pulse/pulsesink.c @@ -470,6 +470,99 @@ gst_pulsering_stream_state_cb (pa_stream * s, void *userdata) } } +static void +gst_pulsering_pull (GstPulseSink * psink, GstPulseRingBuffer * pbuf) +{ + GstBaseSink *basesink; + GstBaseAudioSink *sink; + GstBuffer *buf; + GstRingBuffer *rbuf; + GstFlowReturn ret; + guint len; + + basesink = GST_BASE_SINK (psink); + sink = GST_BASE_AUDIO_SINK (psink); + rbuf = GST_RING_BUFFER_CAST (pbuf); + + GST_PAD_STREAM_LOCK (basesink->sinkpad); + + len = 882; + + /* would be nice to arrange for pad_alloc_buffer to return data -- as it is we + will copy twice, once into data, once into DMA */ + GST_LOG_OBJECT (basesink, "pulling %d bytes offset %" G_GUINT64_FORMAT + " to fill audio buffer", len, basesink->offset); + ret = + gst_pad_pull_range (basesink->sinkpad, basesink->segment.last_stop, len, + &buf); + + if (ret != GST_FLOW_OK) { + if (ret == GST_FLOW_UNEXPECTED) + goto eos; + else + goto error; + } + + GST_PAD_PREROLL_LOCK (basesink->sinkpad); + if (basesink->flushing) + goto flushing; + + /* complete preroll and wait for PLAYING */ + ret = gst_base_sink_do_preroll (basesink, GST_MINI_OBJECT_CAST (buf)); + if (ret != GST_FLOW_OK) + goto preroll_error; + + if (len != GST_BUFFER_SIZE (buf)) { + GST_INFO_OBJECT (basesink, + "got different size than requested from sink pad: %u != %u", len, + GST_BUFFER_SIZE (buf)); + len = MIN (GST_BUFFER_SIZE (buf), len); + } + basesink->segment.last_stop += len; + + GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); + + GST_PAD_STREAM_UNLOCK (basesink->sinkpad); + + return; + +error: + { + GST_WARNING_OBJECT (basesink, "Got flow '%s' but can't return it: %d", + gst_flow_get_name (ret), ret); + gst_ring_buffer_pause (rbuf); + GST_PAD_STREAM_UNLOCK (basesink->sinkpad); + return; + } +eos: + { + /* FIXME: this is not quite correct; we'll be called endlessly until + * the sink gets shut down; maybe we should set a flag somewhere, or + * set segment.stop and segment.duration to the last sample or so */ + GST_DEBUG_OBJECT (sink, "EOS"); + gst_ring_buffer_pause (rbuf); + gst_element_post_message (GST_ELEMENT_CAST (sink), + gst_message_new_eos (GST_OBJECT_CAST (sink))); + GST_PAD_STREAM_UNLOCK (basesink->sinkpad); + } +flushing: + { + GST_DEBUG_OBJECT (sink, "we are flushing"); + gst_ring_buffer_pause (rbuf); + GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); + GST_PAD_STREAM_UNLOCK (basesink->sinkpad); + return; + } +preroll_error: + { + GST_DEBUG_OBJECT (sink, "error %s", gst_flow_get_name (ret)); + gst_ring_buffer_pause (rbuf); + GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); + GST_PAD_STREAM_UNLOCK (basesink->sinkpad); + return; + } +} + static void gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata) { @@ -481,7 +574,10 @@ gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata) GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length); - if (pbuf->in_commit) { + if (GST_RING_BUFFER_CAST (pbuf)->callback) { + /* in pull mode */ + gst_pulsering_pull (psink, pbuf); + } else if (pbuf->in_commit) { pa_threaded_mainloop_signal (psink->mainloop, 0); } } @@ -586,23 +682,6 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) &buf_attr, flags, pv, NULL) < 0) goto connect_failed; - for (;;) { - pa_stream_state_t state; - - state = pa_stream_get_state (pbuf->stream); - - GST_LOG_OBJECT (psink, "stream state is now %d", state); - - if (!PA_STREAM_IS_GOOD (state)) - goto connect_failed; - - if (state == PA_STREAM_READY) - break; - - /* Wait until the stream is ready */ - pa_threaded_mainloop_wait (psink->mainloop); - } - /* our clock will now start from 0 again */ clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock); gst_audio_clock_reset (clock, 0); @@ -620,6 +699,22 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) pbuf->sample_spec.rate, GST_SECOND); GST_LOG_OBJECT (psink, "sample offset %" G_GINT64_FORMAT, pbuf->offset); + for (;;) { + pa_stream_state_t state; + + state = pa_stream_get_state (pbuf->stream); + + GST_LOG_OBJECT (psink, "stream state is now %d", state); + + if (!PA_STREAM_IS_GOOD (state)) + goto connect_failed; + + if (state == PA_STREAM_READY) + break; + + /* Wait until the stream is ready */ + pa_threaded_mainloop_wait (psink->mainloop); + } GST_LOG_OBJECT (psink, "stream is acquired now"); @@ -701,6 +796,8 @@ gst_pulseringbuffer_activate (GstRingBuffer * buf, gboolean active) psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); + GST_DEBUG_OBJECT (psink, "activating"); + return TRUE; } @@ -1338,7 +1435,7 @@ gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass) g_assert ((pulsesink->mainloop = pa_threaded_mainloop_new ())); g_assert (pa_threaded_mainloop_start (pulsesink->mainloop) == 0); -// GST_BASE_SINK (pulsesink)->can_activate_pull = TRUE; + GST_BASE_SINK (pulsesink)->can_activate_pull = TRUE; pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink), G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device, TRUE, FALSE); /* TRUE for sinks, FALSE for sources */