diff --git a/ChangeLog b/ChangeLog index 4374ccdcd1..5dbe2227eb 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,21 @@ +2008-10-16 Wim Taymans + + * docs/libs/gstreamer-libs-sections.txt: + * libs/gst/base/gstbasesink.c: (gst_base_sink_do_preroll), + (gst_base_sink_flush_start), (gst_base_sink_flush_stop), + (gst_base_sink_event), (gst_base_sink_perform_seek), + (gst_base_sink_loop), (gst_base_sink_pad_activate_pull), + (gst_base_sink_send_event), (gst_base_sink_change_state): + * libs/gst/base/gstbasesink.h: + Add method to commit the state in subclasses. + Refactor the flush_start and flush_stop code because we need it for + flushing while seeking too. + Implement the beginnings of seeking in pull mode. + Use the segment last_stop field for the pulling offset. + Fix the pause method in pull mode. + Configure the segment to BYTES for pull mode. + API: GstBaseSink::gst_base_sink_do_preroll() + 2008-10-16 Wim Taymans * libs/gst/base/gstbasesrc.c: (gst_base_src_class_init): diff --git a/docs/libs/gstreamer-libs-sections.txt b/docs/libs/gstreamer-libs-sections.txt index 0a8766da9b..c96925af57 100644 --- a/docs/libs/gstreamer-libs-sections.txt +++ b/docs/libs/gstreamer-libs-sections.txt @@ -258,6 +258,7 @@ GstBaseSinkClass gst_base_sink_query_latency gst_base_sink_get_latency +gst_base_sink_do_preroll gst_base_sink_wait_preroll gst_base_sink_wait_clock gst_base_sink_wait_eos diff --git a/libs/gst/base/gstbasesink.c b/libs/gst/base/gstbasesink.c index 6d058c6ea2..10af7e4ae2 100644 --- a/libs/gst/base/gstbasesink.c +++ b/libs/gst/base/gstbasesink.c @@ -1707,6 +1707,61 @@ stopping: } } +/** + * gst_base_sink_do_preroll: + * @sink: the sink + * @obj: the object that caused the preroll + * + * If the @sink spawns its own thread for pulling buffers from upstream it + * should call this method after it has pulled a buffer. If the element needed + * to preroll, this function will perform the preroll and will then block + * until the element state is changed. + * + * This function should be called with the PREROLL_LOCK held. + * + * Since 0.10.22 + * + * Returns: #GST_FLOW_OK if the preroll completed and processing can + * continue. Any other return value should be returned from the render vmethod. + */ +GstFlowReturn +gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj) +{ + GstFlowReturn ret; + + while (G_UNLIKELY (sink->need_preroll)) { + GST_DEBUG_OBJECT (sink, "prerolling object %p", obj); + + if (G_LIKELY (sink->playing_async)) { + /* commit state */ + if (G_UNLIKELY (!gst_base_sink_commit_state (sink))) + goto stopping; + } + + /* need to recheck here because the commit state could have + * made us not need the preroll anymore */ + if (G_LIKELY (sink->need_preroll)) { + /* block until the state changes, or we get a flush, or something */ + ret = gst_base_sink_wait_preroll (sink); + if (ret != GST_FLOW_OK) + goto flushing; + } + } + return GST_FLOW_OK; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (sink, "we are flushing"); + return ret; + } +stopping: + { + GST_DEBUG_OBJECT (sink, "stopping while commiting state"); + return GST_FLOW_WRONG_STATE; + } +} + /** * gst_base_sink_wait_eos: * @sink: the sink @@ -2529,6 +2584,52 @@ was_eos: } } +static void +gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad) +{ + /* make sure we are not blocked on the clock also clear any pending + * eos state. */ + gst_base_sink_set_flushing (basesink, pad, TRUE); + + /* we grab the stream lock but that is not needed since setting the + * sink to flushing would make sure no state commit is being done + * anymore */ + GST_PAD_STREAM_LOCK (pad); + gst_base_sink_reset_qos (basesink); + if (basesink->priv->async_enabled) { + /* and we need to commit our state again on the next + * prerolled buffer */ + basesink->playing_async = TRUE; + gst_element_lost_state (GST_ELEMENT_CAST (basesink)); + } else { + basesink->priv->have_latency = TRUE; + basesink->need_preroll = FALSE; + } + gst_base_sink_set_last_buffer (basesink, NULL); + GST_PAD_STREAM_UNLOCK (pad); +} + +static void +gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad) +{ + /* unset flushing so we can accept new data, this also flushes out any EOS + * event. */ + gst_base_sink_set_flushing (basesink, pad, FALSE); + + /* for position reporting */ + GST_OBJECT_LOCK (basesink); + basesink->priv->current_sstart = -1; + basesink->priv->current_sstop = -1; + basesink->priv->eos_rtime = -1; + if (basesink->pad_mode == GST_ACTIVATE_PUSH) { + /* we need new segment info after the flush. */ + basesink->have_newsegment = FALSE; + gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED); + } + GST_OBJECT_UNLOCK (basesink); +} + static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event) { @@ -2612,26 +2713,7 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) GST_DEBUG_OBJECT (basesink, "flush-start %p", event); - /* make sure we are not blocked on the clock also clear any pending - * eos state. */ - gst_base_sink_set_flushing (basesink, pad, TRUE); - - /* we grab the stream lock but that is not needed since setting the - * sink to flushing would make sure no state commit is being done - * anymore */ - GST_PAD_STREAM_LOCK (pad); - gst_base_sink_reset_qos (basesink); - if (basesink->priv->async_enabled) { - /* and we need to commit our state again on the next - * prerolled buffer */ - basesink->playing_async = TRUE; - gst_element_lost_state (GST_ELEMENT_CAST (basesink)); - } else { - basesink->priv->have_latency = TRUE; - basesink->need_preroll = FALSE; - } - gst_base_sink_set_last_buffer (basesink, NULL); - GST_PAD_STREAM_UNLOCK (pad); + gst_base_sink_flush_start (basesink, pad); gst_event_unref (event); break; @@ -2641,22 +2723,7 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) GST_DEBUG_OBJECT (basesink, "flush-stop %p", event); - /* unset flushing so we can accept new data, this also flushes out any EOS - * event. */ - gst_base_sink_set_flushing (basesink, pad, FALSE); - - /* for position reporting */ - GST_OBJECT_LOCK (basesink); - basesink->priv->current_sstart = -1; - basesink->priv->current_sstop = -1; - basesink->priv->eos_rtime = -1; - basesink->have_newsegment = FALSE; - GST_OBJECT_UNLOCK (basesink); - - /* we need new segment info after the flush. */ - gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); - gst_segment_init (basesink->abidata.ABI.clip_segment, - GST_FORMAT_UNDEFINED); + gst_base_sink_flush_stop (basesink, pad); gst_event_unref (event); break; @@ -2864,6 +2931,52 @@ wrong_mode: } } +/* perform a seek, only executed in pull mode */ +static gboolean +gst_base_sink_perform_seek (GstBaseSink * basesink, GstPad * pad, + GstEvent * event) +{ + gboolean flush; + gdouble rate; + GstFormat seek_format; + GstSeekFlags flags; + GstSeekType cur_type, stop_type; + gint64 cur, stop; + + if (event) { + GST_DEBUG_OBJECT (basesink, "performing seek with event %p", event); + gst_event_parse_seek (event, &rate, &seek_format, &flags, + &cur_type, &cur, &stop_type, &stop); + + flush = flags & GST_SEEK_FLAG_FLUSH; + } else { + GST_DEBUG_OBJECT (basesink, "performing seek without event"); + flush = FALSE; + } + + if (flush) { + GST_DEBUG_OBJECT (basesink, "flushing upstream"); + gst_pad_push_event (pad, gst_event_new_flush_start ()); + gst_base_sink_flush_start (basesink, pad); + } else { + GST_DEBUG_OBJECT (basesink, "pausing pulling thread"); + } + + GST_PAD_STREAM_LOCK (pad); + + if (flush) { + GST_DEBUG_OBJECT (basesink, "stop flushing upstream"); + gst_pad_push_event (pad, gst_event_new_flush_stop ()); + gst_base_sink_flush_stop (basesink, pad); + } else { + GST_DEBUG_OBJECT (basesink, "restarting pulling thread"); + } + + GST_PAD_STREAM_UNLOCK (pad); + + return FALSE; +} + /* with STREAM_LOCK */ static void @@ -2873,6 +2986,7 @@ gst_base_sink_loop (GstPad * pad) GstBuffer *buf = NULL; GstFlowReturn result; guint blocksize; + guint64 offset; basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad)); @@ -2881,17 +2995,21 @@ gst_base_sink_loop (GstPad * pad) if ((blocksize = basesink->priv->blocksize) == 0) blocksize = -1; - GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u", - basesink->offset, blocksize); + offset = basesink->segment.last_stop; - result = gst_pad_pull_range (pad, basesink->offset, blocksize, &buf); + GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u", + offset, blocksize); + + result = gst_pad_pull_range (pad, offset, blocksize, &buf); if (G_UNLIKELY (result != GST_FLOW_OK)) goto paused; if (G_UNLIKELY (buf == NULL)) goto no_buffer; - basesink->offset += GST_BUFFER_SIZE (buf); + offset += GST_BUFFER_SIZE (buf); + + gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset); GST_PAD_PREROLL_LOCK (pad); result = gst_base_sink_chain_unlocked (basesink, pad, buf); @@ -2909,13 +3027,22 @@ paused: gst_pad_pause_task (pad); /* fatal errors and NOT_LINKED cause EOS */ if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) { - /* FIXME, we shouldn't post EOS when we are operating in segment mode */ - gst_base_sink_event (pad, gst_event_new_eos ()); - /* EOS does not cause an ERROR message */ - if (result != GST_FLOW_UNEXPECTED) { + if (result == GST_FLOW_UNEXPECTED) { + /* perform EOS logic */ + if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) { + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_segment_done (GST_OBJECT_CAST (basesink), + basesink->segment.format, basesink->segment.last_stop)); + } else { + gst_base_sink_event (pad, gst_event_new_eos ()); + } + } else { + /* for fatal errors we post an error message, post the error + * first so the app knows about the error first. */ GST_ELEMENT_ERROR (basesink, STREAM, FAILED, (_("Internal data stream error.")), ("stream stopped, reason %s", gst_flow_get_name (result))); + gst_base_sink_event (pad, gst_event_new_eos ()); } } return; @@ -2923,6 +3050,8 @@ paused: no_buffer: { GST_LOG_OBJECT (basesink, "no buffer, pausing"); + GST_ELEMENT_ERROR (basesink, STREAM, FAILED, + (_("Internal data flow error.")), ("element returned NULL buffer")); result = GST_FLOW_ERROR; goto paused; } @@ -3164,8 +3293,8 @@ gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active) /* we mark we have a newsegment here because pull based * mode works just fine without having a newsegment before the * first buffer */ - gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); - gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED); + gst_segment_init (&basesink->segment, GST_FORMAT_BYTES); + gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_BYTES); GST_OBJECT_LOCK (basesink); basesink->have_newsegment = TRUE; GST_OBJECT_UNLOCK (basesink); @@ -3205,9 +3334,16 @@ gst_base_sink_send_event (GstElement * element, GstEvent * event) GstPad *pad; GstBaseSink *basesink = GST_BASE_SINK (element); gboolean forward, result = TRUE; + GstActivateMode mode; - /* only push UPSTREAM events upstream */ - forward = GST_EVENT_IS_UPSTREAM (event); + GST_OBJECT_LOCK (element); + /* get the pad and the scheduling mode */ + pad = gst_object_ref (basesink->sinkpad); + mode = basesink->pad_mode; + GST_OBJECT_UNLOCK (element); + + /* only push UPSTREAM events upstream and if we are in push mode */ + forward = GST_EVENT_IS_UPSTREAM (event) && (mode == GST_ACTIVATE_PUSH); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_LATENCY: @@ -3229,22 +3365,23 @@ gst_base_sink_send_event (GstElement * element, GstEvent * event) * when a particular piece of data will be rendered. */ break; } + case GST_EVENT_SEEK: + /* in pull mode we will execute the seek */ + if (mode == GST_ACTIVATE_PULL) + result = gst_base_sink_perform_seek (basesink, pad, event); + break; default: break; } if (forward) { - GST_OBJECT_LOCK (element); - pad = gst_object_ref (basesink->sinkpad); - GST_OBJECT_UNLOCK (element); - result = gst_pad_push_event (pad, event); - - gst_object_unref (pad); } else { /* not forwarded, unref the event */ gst_event_unref (event); } + + gst_object_unref (pad); return result; } @@ -3621,21 +3758,6 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) } switch (transition) { - case GST_STATE_CHANGE_READY_TO_PAUSED: -#if 0 - /* note that this is the upward case, which doesn't follow most - patterns */ - if (basesink->pad_mode == GST_ACTIVATE_PULL) { - GST_DEBUG_OBJECT (basesink, "basesink activated in pull mode, " - "returning SUCCESS directly"); - GST_PAD_PREROLL_LOCK (basesink->sinkpad); - gst_element_post_message (GST_ELEMENT_CAST (basesink), - gst_message_new_async_done (GST_OBJECT_CAST (basesink))); - GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); - ret = GST_STATE_CHANGE_SUCCESS; - } -#endif - break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED"); /* FIXME, make sure we cannot enter _render first */ diff --git a/libs/gst/base/gstbasesink.h b/libs/gst/base/gstbasesink.h index d7fb931ea0..ed9222e46c 100644 --- a/libs/gst/base/gstbasesink.h +++ b/libs/gst/base/gstbasesink.h @@ -187,6 +187,7 @@ struct _GstBaseSinkClass { GType gst_base_sink_get_type(void); +GstFlowReturn gst_base_sink_do_preroll (GstBaseSink *sink, GstMiniObject *obj); GstFlowReturn gst_base_sink_wait_preroll (GstBaseSink *sink); /* synchronizing against the clock */