Add method to commit the state in subclasses.

Original commit message from CVS:
* docs/libs/gstreamer-libs-sections.txt:
* libs/gst/base/gstbasesink.c: (gst_base_sink_do_preroll),
(gst_base_sink_flush_start), (gst_base_sink_flush_stop),
(gst_base_sink_event), (gst_base_sink_perform_seek),
(gst_base_sink_loop), (gst_base_sink_pad_activate_pull),
(gst_base_sink_send_event), (gst_base_sink_change_state):
* libs/gst/base/gstbasesink.h:
Add method to commit the state in subclasses.
Refactor the flush_start and flush_stop code because we need it for
flushing while seeking too.
Implement the beginnings of seeking in pull mode.
Use the segment last_stop field for the pulling offset.
Fix the pause method in pull mode.
Configure the segment to BYTES for pull mode.
API: GstBaseSink::gst_base_sink_do_preroll()
This commit is contained in:
Wim Taymans 2008-10-16 14:09:18 +00:00
parent 63b0da36b5
commit fe07568997
4 changed files with 211 additions and 69 deletions

View file

@ -1,3 +1,21 @@
2008-10-16 Wim Taymans <wim.taymans@collabora.co.uk>
* docs/libs/gstreamer-libs-sections.txt:
* libs/gst/base/gstbasesink.c: (gst_base_sink_do_preroll),
(gst_base_sink_flush_start), (gst_base_sink_flush_stop),
(gst_base_sink_event), (gst_base_sink_perform_seek),
(gst_base_sink_loop), (gst_base_sink_pad_activate_pull),
(gst_base_sink_send_event), (gst_base_sink_change_state):
* libs/gst/base/gstbasesink.h:
Add method to commit the state in subclasses.
Refactor the flush_start and flush_stop code because we need it for
flushing while seeking too.
Implement the beginnings of seeking in pull mode.
Use the segment last_stop field for the pulling offset.
Fix the pause method in pull mode.
Configure the segment to BYTES for pull mode.
API: GstBaseSink::gst_base_sink_do_preroll()
2008-10-16 Wim Taymans <wim.taymans@collabora.co.uk>
* libs/gst/base/gstbasesrc.c: (gst_base_src_class_init):

View file

@ -258,6 +258,7 @@ GstBaseSinkClass
gst_base_sink_query_latency
gst_base_sink_get_latency
gst_base_sink_do_preroll
gst_base_sink_wait_preroll
gst_base_sink_wait_clock
gst_base_sink_wait_eos

View file

@ -1707,6 +1707,61 @@ stopping:
}
}
/**
* gst_base_sink_do_preroll:
* @sink: the sink
* @obj: the object that caused the preroll
*
* If the @sink spawns its own thread for pulling buffers from upstream it
* should call this method after it has pulled a buffer. If the element needed
* to preroll, this function will perform the preroll and will then block
* until the element state is changed.
*
* This function should be called with the PREROLL_LOCK held.
*
* Since 0.10.22
*
* Returns: #GST_FLOW_OK if the preroll completed and processing can
* continue. Any other return value should be returned from the render vmethod.
*/
GstFlowReturn
gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
{
GstFlowReturn ret;
while (G_UNLIKELY (sink->need_preroll)) {
GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
if (G_LIKELY (sink->playing_async)) {
/* commit state */
if (G_UNLIKELY (!gst_base_sink_commit_state (sink)))
goto stopping;
}
/* need to recheck here because the commit state could have
* made us not need the preroll anymore */
if (G_LIKELY (sink->need_preroll)) {
/* block until the state changes, or we get a flush, or something */
ret = gst_base_sink_wait_preroll (sink);
if (ret != GST_FLOW_OK)
goto flushing;
}
}
return GST_FLOW_OK;
/* ERRORS */
flushing:
{
GST_DEBUG_OBJECT (sink, "we are flushing");
return ret;
}
stopping:
{
GST_DEBUG_OBJECT (sink, "stopping while commiting state");
return GST_FLOW_WRONG_STATE;
}
}
/**
* gst_base_sink_wait_eos:
* @sink: the sink
@ -2529,6 +2584,52 @@ was_eos:
}
}
static void
gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
{
/* make sure we are not blocked on the clock also clear any pending
* eos state. */
gst_base_sink_set_flushing (basesink, pad, TRUE);
/* we grab the stream lock but that is not needed since setting the
* sink to flushing would make sure no state commit is being done
* anymore */
GST_PAD_STREAM_LOCK (pad);
gst_base_sink_reset_qos (basesink);
if (basesink->priv->async_enabled) {
/* and we need to commit our state again on the next
* prerolled buffer */
basesink->playing_async = TRUE;
gst_element_lost_state (GST_ELEMENT_CAST (basesink));
} else {
basesink->priv->have_latency = TRUE;
basesink->need_preroll = FALSE;
}
gst_base_sink_set_last_buffer (basesink, NULL);
GST_PAD_STREAM_UNLOCK (pad);
}
static void
gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
{
/* unset flushing so we can accept new data, this also flushes out any EOS
* event. */
gst_base_sink_set_flushing (basesink, pad, FALSE);
/* for position reporting */
GST_OBJECT_LOCK (basesink);
basesink->priv->current_sstart = -1;
basesink->priv->current_sstop = -1;
basesink->priv->eos_rtime = -1;
if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
/* we need new segment info after the flush. */
basesink->have_newsegment = FALSE;
gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
}
GST_OBJECT_UNLOCK (basesink);
}
static gboolean
gst_base_sink_event (GstPad * pad, GstEvent * event)
{
@ -2612,26 +2713,7 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
/* make sure we are not blocked on the clock also clear any pending
* eos state. */
gst_base_sink_set_flushing (basesink, pad, TRUE);
/* we grab the stream lock but that is not needed since setting the
* sink to flushing would make sure no state commit is being done
* anymore */
GST_PAD_STREAM_LOCK (pad);
gst_base_sink_reset_qos (basesink);
if (basesink->priv->async_enabled) {
/* and we need to commit our state again on the next
* prerolled buffer */
basesink->playing_async = TRUE;
gst_element_lost_state (GST_ELEMENT_CAST (basesink));
} else {
basesink->priv->have_latency = TRUE;
basesink->need_preroll = FALSE;
}
gst_base_sink_set_last_buffer (basesink, NULL);
GST_PAD_STREAM_UNLOCK (pad);
gst_base_sink_flush_start (basesink, pad);
gst_event_unref (event);
break;
@ -2641,22 +2723,7 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
/* unset flushing so we can accept new data, this also flushes out any EOS
* event. */
gst_base_sink_set_flushing (basesink, pad, FALSE);
/* for position reporting */
GST_OBJECT_LOCK (basesink);
basesink->priv->current_sstart = -1;
basesink->priv->current_sstop = -1;
basesink->priv->eos_rtime = -1;
basesink->have_newsegment = FALSE;
GST_OBJECT_UNLOCK (basesink);
/* we need new segment info after the flush. */
gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (basesink->abidata.ABI.clip_segment,
GST_FORMAT_UNDEFINED);
gst_base_sink_flush_stop (basesink, pad);
gst_event_unref (event);
break;
@ -2864,6 +2931,52 @@ wrong_mode:
}
}
/* perform a seek, only executed in pull mode */
static gboolean
gst_base_sink_perform_seek (GstBaseSink * basesink, GstPad * pad,
GstEvent * event)
{
gboolean flush;
gdouble rate;
GstFormat seek_format;
GstSeekFlags flags;
GstSeekType cur_type, stop_type;
gint64 cur, stop;
if (event) {
GST_DEBUG_OBJECT (basesink, "performing seek with event %p", event);
gst_event_parse_seek (event, &rate, &seek_format, &flags,
&cur_type, &cur, &stop_type, &stop);
flush = flags & GST_SEEK_FLAG_FLUSH;
} else {
GST_DEBUG_OBJECT (basesink, "performing seek without event");
flush = FALSE;
}
if (flush) {
GST_DEBUG_OBJECT (basesink, "flushing upstream");
gst_pad_push_event (pad, gst_event_new_flush_start ());
gst_base_sink_flush_start (basesink, pad);
} else {
GST_DEBUG_OBJECT (basesink, "pausing pulling thread");
}
GST_PAD_STREAM_LOCK (pad);
if (flush) {
GST_DEBUG_OBJECT (basesink, "stop flushing upstream");
gst_pad_push_event (pad, gst_event_new_flush_stop ());
gst_base_sink_flush_stop (basesink, pad);
} else {
GST_DEBUG_OBJECT (basesink, "restarting pulling thread");
}
GST_PAD_STREAM_UNLOCK (pad);
return FALSE;
}
/* with STREAM_LOCK
*/
static void
@ -2873,6 +2986,7 @@ gst_base_sink_loop (GstPad * pad)
GstBuffer *buf = NULL;
GstFlowReturn result;
guint blocksize;
guint64 offset;
basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
@ -2881,17 +2995,21 @@ gst_base_sink_loop (GstPad * pad)
if ((blocksize = basesink->priv->blocksize) == 0)
blocksize = -1;
GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
basesink->offset, blocksize);
offset = basesink->segment.last_stop;
result = gst_pad_pull_range (pad, basesink->offset, blocksize, &buf);
GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
offset, blocksize);
result = gst_pad_pull_range (pad, offset, blocksize, &buf);
if (G_UNLIKELY (result != GST_FLOW_OK))
goto paused;
if (G_UNLIKELY (buf == NULL))
goto no_buffer;
basesink->offset += GST_BUFFER_SIZE (buf);
offset += GST_BUFFER_SIZE (buf);
gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
GST_PAD_PREROLL_LOCK (pad);
result = gst_base_sink_chain_unlocked (basesink, pad, buf);
@ -2909,13 +3027,22 @@ paused:
gst_pad_pause_task (pad);
/* fatal errors and NOT_LINKED cause EOS */
if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
/* FIXME, we shouldn't post EOS when we are operating in segment mode */
gst_base_sink_event (pad, gst_event_new_eos ());
/* EOS does not cause an ERROR message */
if (result != GST_FLOW_UNEXPECTED) {
if (result == GST_FLOW_UNEXPECTED) {
/* perform EOS logic */
if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
basesink->segment.format, basesink->segment.last_stop));
} else {
gst_base_sink_event (pad, gst_event_new_eos ());
}
} else {
/* for fatal errors we post an error message, post the error
* first so the app knows about the error first. */
GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
(_("Internal data stream error.")),
("stream stopped, reason %s", gst_flow_get_name (result)));
gst_base_sink_event (pad, gst_event_new_eos ());
}
}
return;
@ -2923,6 +3050,8 @@ paused:
no_buffer:
{
GST_LOG_OBJECT (basesink, "no buffer, pausing");
GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
(_("Internal data flow error.")), ("element returned NULL buffer"));
result = GST_FLOW_ERROR;
goto paused;
}
@ -3164,8 +3293,8 @@ gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
/* we mark we have a newsegment here because pull based
* mode works just fine without having a newsegment before the
* first buffer */
gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&basesink->segment, GST_FORMAT_BYTES);
gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_BYTES);
GST_OBJECT_LOCK (basesink);
basesink->have_newsegment = TRUE;
GST_OBJECT_UNLOCK (basesink);
@ -3205,9 +3334,16 @@ gst_base_sink_send_event (GstElement * element, GstEvent * event)
GstPad *pad;
GstBaseSink *basesink = GST_BASE_SINK (element);
gboolean forward, result = TRUE;
GstActivateMode mode;
/* only push UPSTREAM events upstream */
forward = GST_EVENT_IS_UPSTREAM (event);
GST_OBJECT_LOCK (element);
/* get the pad and the scheduling mode */
pad = gst_object_ref (basesink->sinkpad);
mode = basesink->pad_mode;
GST_OBJECT_UNLOCK (element);
/* only push UPSTREAM events upstream and if we are in push mode */
forward = GST_EVENT_IS_UPSTREAM (event) && (mode == GST_ACTIVATE_PUSH);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_LATENCY:
@ -3229,22 +3365,23 @@ gst_base_sink_send_event (GstElement * element, GstEvent * event)
* when a particular piece of data will be rendered. */
break;
}
case GST_EVENT_SEEK:
/* in pull mode we will execute the seek */
if (mode == GST_ACTIVATE_PULL)
result = gst_base_sink_perform_seek (basesink, pad, event);
break;
default:
break;
}
if (forward) {
GST_OBJECT_LOCK (element);
pad = gst_object_ref (basesink->sinkpad);
GST_OBJECT_UNLOCK (element);
result = gst_pad_push_event (pad, event);
gst_object_unref (pad);
} else {
/* not forwarded, unref the event */
gst_event_unref (event);
}
gst_object_unref (pad);
return result;
}
@ -3621,21 +3758,6 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
}
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
#if 0
/* note that this is the upward case, which doesn't follow most
patterns */
if (basesink->pad_mode == GST_ACTIVATE_PULL) {
GST_DEBUG_OBJECT (basesink, "basesink activated in pull mode, "
"returning SUCCESS directly");
GST_PAD_PREROLL_LOCK (basesink->sinkpad);
gst_element_post_message (GST_ELEMENT_CAST (basesink),
gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
ret = GST_STATE_CHANGE_SUCCESS;
}
#endif
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
/* FIXME, make sure we cannot enter _render first */

View file

@ -187,6 +187,7 @@ struct _GstBaseSinkClass {
GType gst_base_sink_get_type(void);
GstFlowReturn gst_base_sink_do_preroll (GstBaseSink *sink, GstMiniObject *obj);
GstFlowReturn gst_base_sink_wait_preroll (GstBaseSink *sink);
/* synchronizing against the clock */