diff --git a/ChangeLog b/ChangeLog index 539175ac26..6487b08948 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,26 @@ +2006-01-30 Wim Taymans + + * libs/gst/base/gstbasesink.c: (gst_base_sink_init), + (gst_base_sink_preroll_queue_empty), (gst_base_sink_commit_state), + (gst_base_sink_handle_object), (gst_base_sink_event), + (gst_base_sink_is_prerolled), (gst_base_sink_wait), + (gst_base_sink_do_sync), (gst_base_sink_handle_event), + (gst_base_sink_handle_buffer), (gst_base_sink_set_flushing), + (gst_base_sink_deactivate), (gst_base_sink_activate), + (gst_base_sink_activate_pull), (gst_base_sink_get_position), + (gst_base_sink_query), (gst_base_sink_change_state): + Basesink cleanups, remove some old code. + Handle the case where a subclass can preroll in the render + method (mostly audiosinks). + Handle more events. + Remove some locks around variables that are now protected + with the PREROLL_LOCK (clock_id, flushing, ..). + Optimize position query some more, do correct locking. + Remove old code to push queue in state change, this is not + needed anymore since preroll blocks on all prerollable items + now. + Almost implemented as described in design doc. + 2006-01-30 Wim Taymans * tests/check/gst/gstbin.c: (GST_START_TEST): diff --git a/libs/gst/base/gstbasesink.c b/libs/gst/base/gstbasesink.c index 78b309f35e..dfd14a8ab8 100644 --- a/libs/gst/base/gstbasesink.c +++ b/libs/gst/base/gstbasesink.c @@ -1,5 +1,5 @@ /* GStreamer - * Copyright (C) 2005 Wim Taymans + * Copyright (C) 2005,2006 Wim Taymans * * gstbasesink.c: Base class for sink elements * @@ -65,10 +65,16 @@ * After synchronisation the virtual method #GstBaseSink::render will be called. * Subclasses should minimally implement this method. * - * Upon receiving the EOS event in th PLAYING state, #GstBaseSink will wait for + * Since 0.10.3 subclasses that synchronize on the clock in the ::render method + * are supported as well. These classes typically receive a buffer in the render + * method and can then potentially block on the clock while rendering. A typical + * example would be an audiosink. + * + * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait for * the clock to reach the time indicated by the stop time of the last ::get_times * call before posting an EOS message. When the element receives EOS in PAUSED, - * the event is queued and an EOS message is posted when going to PLAYING. + * preroll completes, the event is queued and an EOS message is posted when going + * to PLAYING. * * #GstBaseSink will internally use the GST_EVENT_NEW_SEGMENT events to schedule * synchronisation and clipping of buffers. Buffers that fall completely outside @@ -78,7 +84,8 @@ * * #GstBaseSink will by default report the current playback position in * GST_FORMAT_TIME based on the current clock time and segment information. - * If the element is EOS however, the query will be forwarded upstream. + * If the element is EOS, PAUSED or no clock has been set on the element, the + * query will be forwarded upstream. * * The ::set_caps function will be called when the subclass should configure itself * to precess a specific media type. @@ -99,7 +106,7 @@ * operations they perform in the ::render method. This is mostly usefull when * the ::render method performs a blocking write on a file descripter. * - * Last reviewed on 2006-01-18 (0.10.0) + * Last reviewed on 2006-01-30 (0.10.3) */ #ifdef HAVE_CONFIG_H @@ -180,6 +187,8 @@ static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf); static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer, GstClockTime * start, GstClockTime * end); +static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink, + GstPad * pad, gboolean flushing); static GstStateChangeReturn gst_base_sink_change_state (GstElement * element, GstStateChange transition); @@ -335,7 +344,6 @@ gst_base_sink_init (GstBaseSink * basesink, gpointer g_class) gst_element_add_pad (GST_ELEMENT (basesink), basesink->sinkpad); basesink->pad_mode = GST_ACTIVATE_NONE; - GST_PAD_TASK (basesink->sinkpad) = NULL; basesink->preroll_queue = g_queue_new (); basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH; @@ -425,7 +433,7 @@ gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size, return GST_FLOW_OK; } -/* with PREROLL_LOCK */ +/* with PREROLL_LOCK, STREAM_LOCK */ static GstFlowReturn gst_base_sink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad) { @@ -438,44 +446,25 @@ gst_base_sink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad) if (q) { GST_DEBUG_OBJECT (basesink, "emptying queue"); while ((obj = g_queue_pop_head (q))) { - gboolean is_buffer; + basesink->preroll_queued--; - is_buffer = GST_IS_BUFFER (obj); - if (G_LIKELY (is_buffer)) { - basesink->preroll_queued--; + if (G_LIKELY (GST_IS_BUFFER (obj))) { basesink->buffers_queued--; - } else { - switch (GST_EVENT_TYPE (obj)) { - case GST_EVENT_EOS: - basesink->preroll_queued--; - break; - default: - break; - } - basesink->events_queued--; - } - /* we release the preroll lock while pushing so that we - * can still flush it while blocking on the clock or - * inside the element. */ - GST_PAD_PREROLL_UNLOCK (pad); - - if (G_LIKELY (is_buffer)) { GST_DEBUG_OBJECT (basesink, "popped buffer %p", obj); ret = gst_base_sink_handle_buffer (basesink, GST_BUFFER_CAST (obj)); } else { + basesink->events_queued--; GST_DEBUG_OBJECT (basesink, "popped event %p", obj); gst_base_sink_handle_event (basesink, GST_EVENT_CAST (obj)); ret = GST_FLOW_OK; } - - GST_PAD_PREROLL_LOCK (pad); } GST_DEBUG_OBJECT (basesink, "queue empty"); } return ret; } -/* with PREROLL_LOCK */ +/* with PREROLL_LOCK, STREAM_LOCK */ static void gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad) { @@ -500,74 +489,80 @@ gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad) GST_PAD_PREROLL_SIGNAL (pad); } -/* with PREROLL_LOCK */ +/* with PREROLL_LOCK, STREAM_LOCK */ static gboolean gst_base_sink_commit_state (GstBaseSink * basesink) { /* commit state and proceed to next pending state */ - { - GstState current, next, pending, post_pending; - GstMessage *message; - gboolean post_paused = FALSE; - gboolean post_playing = FALSE; + GstState current, next, pending, post_pending; + GstMessage *message; + gboolean post_paused = FALSE; + gboolean post_playing = FALSE; - GST_OBJECT_LOCK (basesink); - current = GST_STATE (basesink); - next = GST_STATE_NEXT (basesink); - pending = GST_STATE_PENDING (basesink); - post_pending = pending; + GST_OBJECT_LOCK (basesink); + current = GST_STATE (basesink); + next = GST_STATE_NEXT (basesink); + pending = GST_STATE_PENDING (basesink); + post_pending = pending; - switch (pending) { - case GST_STATE_PLAYING: - basesink->need_preroll = FALSE; - post_playing = TRUE; - /* post PAUSED too when we were READY */ - if (current == GST_STATE_READY) { - post_paused = TRUE; - } - break; - case GST_STATE_PAUSED: - basesink->need_preroll = TRUE; + switch (pending) { + case GST_STATE_PLAYING: + basesink->need_preroll = FALSE; + post_playing = TRUE; + /* post PAUSED too when we were READY */ + if (current == GST_STATE_READY) { post_paused = TRUE; - post_pending = GST_STATE_VOID_PENDING; - break; - case GST_STATE_READY: - goto stopping; - default: - break; - } - - if (pending != GST_STATE_VOID_PENDING) { - GST_STATE (basesink) = pending; - GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING; - GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING; - GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS; - } - GST_OBJECT_UNLOCK (basesink); - - if (post_paused) { - message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink), - current, next, post_pending); - gst_element_post_message (GST_ELEMENT_CAST (basesink), message); - } - if (post_playing) { - message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink), - next, pending, GST_STATE_VOID_PENDING); - gst_element_post_message (GST_ELEMENT_CAST (basesink), message); - } - /* and mark dirty */ - if (post_paused || post_playing) { - gst_element_post_message (GST_ELEMENT_CAST (basesink), - gst_message_new_state_dirty (GST_OBJECT_CAST (basesink))); - } - - GST_STATE_BROADCAST (basesink); + } + break; + case GST_STATE_PAUSED: + basesink->need_preroll = TRUE; + post_paused = TRUE; + post_pending = GST_STATE_VOID_PENDING; + break; + case GST_STATE_READY: + case GST_STATE_NULL: + goto stopping; + case GST_STATE_VOID_PENDING: + goto nothing_pending; + default: + break; } + + GST_STATE (basesink) = pending; + GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING; + GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING; + GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS; + GST_OBJECT_UNLOCK (basesink); + + if (post_paused) { + message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink), + current, next, post_pending); + gst_element_post_message (GST_ELEMENT_CAST (basesink), message); + } + if (post_playing) { + message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink), + next, pending, GST_STATE_VOID_PENDING); + gst_element_post_message (GST_ELEMENT_CAST (basesink), message); + } + /* and mark dirty */ + if (post_paused || post_playing) { + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_state_dirty (GST_OBJECT_CAST (basesink))); + } + + GST_STATE_BROADCAST (basesink); + return TRUE; +nothing_pending: + { + GST_OBJECT_UNLOCK (basesink); + return TRUE; + } stopping: { /* app is going to READY */ + basesink->need_preroll = FALSE; GST_OBJECT_UNLOCK (basesink); return FALSE; } @@ -579,59 +574,21 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad, GstMiniObject * obj) { gint length; - gboolean have_event; + gboolean have_buffer; GstFlowReturn ret; GST_PAD_PREROLL_LOCK (pad); + if (basesink->flushing) + goto in_flushing; + /* push object on the queue */ GST_DEBUG_OBJECT (basesink, "push %p on preroll_queue", obj); g_queue_push_tail (basesink->preroll_queue, obj); - have_event = GST_IS_EVENT (obj); - if (have_event) { - GstEvent *event = GST_EVENT (obj); - - switch (GST_EVENT_TYPE (obj)) { - case GST_EVENT_EOS: - basesink->preroll_queued++; - basesink->eos = TRUE; - basesink->eos_queued = TRUE; - break; - case GST_EVENT_NEWSEGMENT: - { - gboolean update; - gdouble rate; - GstFormat format; - gint64 start; - gint64 stop; - gint64 time; - - /* the newsegment event is needed to bring the buffer timestamps to the - * stream time and to drop samples outside of the playback segment. */ - gst_event_parse_new_segment (event, &update, &rate, &format, - &start, &stop, &time); - - basesink->have_newsegment = TRUE; - - gst_segment_set_newsegment (&basesink->segment, update, rate, format, - start, stop, time); - - GST_DEBUG_OBJECT (basesink, - "received NEWSEGMENT %" GST_TIME_FORMAT " -- %" - GST_TIME_FORMAT ", time %" GST_TIME_FORMAT ", accum %" - GST_TIME_FORMAT, - GST_TIME_ARGS (basesink->segment.start), - GST_TIME_ARGS (basesink->segment.stop), - GST_TIME_ARGS (basesink->segment.time), - GST_TIME_ARGS (basesink->segment.accum)); - break; - } - default: - break; - } - basesink->events_queued++; - } else { - GstBuffer *buf = GST_BUFFER (obj); + have_buffer = GST_IS_BUFFER (obj); + if (G_LIKELY (have_buffer)) { + GstBuffer *buf = GST_BUFFER_CAST (obj); + GstClockTime start = -1, end = -1; if (!basesink->have_newsegment) { GST_ELEMENT_WARNING (basesink, STREAM, FAILED, @@ -644,26 +601,34 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad, } /* check if the buffer needs to be dropped */ - if (TRUE) { - GstClockTime start = -1, end = -1; + /* we don't use the subclassed method as it may not return + * valid values for our purpose here */ + gst_base_sink_get_times (basesink, buf, &start, &end); - /* we don't use the subclassed method as it may not return - * valid values for our purpose here */ - gst_base_sink_get_times (basesink, buf, &start, &end); + GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT + ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end)); - GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT - ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), - GST_TIME_ARGS (end)); - - if (GST_CLOCK_TIME_IS_VALID (start) && - (basesink->segment.format == GST_FORMAT_TIME)) { - if (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME, - (gint64) start, (gint64) end, NULL, NULL)) - goto dropping; - } + if (GST_CLOCK_TIME_IS_VALID (start) && + (basesink->segment.format == GST_FORMAT_TIME)) { + if (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME, + (gint64) start, (gint64) end, NULL, NULL)) + goto out_of_segment; } - basesink->preroll_queued++; basesink->buffers_queued++; + basesink->preroll_queued++; + } else { + GstEvent *event = GST_EVENT_CAST (obj); + + switch (GST_EVENT_TYPE (event)) { + /* only EOS finishes preroll */ + case GST_EVENT_EOS: + basesink->preroll_queued++; + basesink->eos_queued = TRUE; + break; + default: + break; + } + basesink->events_queued++; } GST_DEBUG_OBJECT (basesink, @@ -672,17 +637,19 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad, basesink->buffers_queued, basesink->events_queued); /* check if we are prerolling */ - if (!basesink->need_preroll) + if (G_LIKELY (!basesink->need_preroll)) goto no_preroll; - /* there is a buffer queued */ - if (basesink->buffers_queued == 1) { + length = basesink->preroll_queued; + GST_DEBUG_OBJECT (basesink, "prerolled length %d", length); + + if (length == 1) { GST_DEBUG_OBJECT (basesink, "do preroll %p", obj); /* if it's a buffer, we need to call the preroll method */ - if (GST_IS_BUFFER (obj)) { + if (G_LIKELY (have_buffer)) { GstBaseSinkClass *bclass; - GstBuffer *buf = GST_BUFFER (obj); + GstBuffer *buf = GST_BUFFER_CAST (obj); GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); @@ -692,71 +659,47 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad, if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK) goto preroll_failed; } - } - length = basesink->preroll_queued; - GST_DEBUG_OBJECT (basesink, "prerolled length %d", length); - - if (length == 1) { - - basesink->have_preroll = TRUE; /* commit state */ if (!gst_base_sink_commit_state (basesink)) goto stopping; - GST_OBJECT_LOCK (pad); - if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) - goto flushing; - GST_OBJECT_UNLOCK (pad); - /* it is possible that commiting the state made us go to PLAYING * now in which case we don't need to block anymore. */ - if (!basesink->need_preroll) + if (!basesink->need_preroll) { goto no_preroll; - - length = basesink->preroll_queued; - - /* FIXME: a pad probe could have made us lose the buffer, according - * to one of the python tests */ - if (length == 0) { - GST_ERROR_OBJECT (basesink, - "preroll_queued dropped from 1 to 0 while committing state change"); } - g_assert (length <= 1); } - /* see if we need to block now. We cannot block on events, only - * on buffers, the reason is that events can be sent from the - * application thread and we don't want to block there. */ - if (length <= basesink->preroll_queue_max_len || have_event) + /* see if we need to block now. */ + if (G_UNLIKELY (length <= basesink->preroll_queue_max_len)) goto more_preroll; /* block until the state changes, or we get a flush, or something */ GST_DEBUG_OBJECT (basesink, "waiting to finish preroll"); + basesink->have_preroll = TRUE; GST_PAD_PREROLL_WAIT (pad); + basesink->have_preroll = FALSE; GST_DEBUG_OBJECT (basesink, "done preroll"); - GST_OBJECT_LOCK (pad); - if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + if (G_UNLIKELY (basesink->flushing)) goto flushing; - GST_OBJECT_UNLOCK (pad); /* we can start rendering the data now */ no_preroll: GST_DEBUG_OBJECT (basesink, "no preroll needed"); - /* maybe it was another sink that blocked in preroll, need to check for - buffers to drain */ - basesink->have_preroll = FALSE; + /* render all our buffers now */ ret = gst_base_sink_preroll_queue_empty (basesink, pad); + GST_PAD_PREROLL_UNLOCK (pad); return ret; /* special cases */ -dropping: +out_of_segment: { GstBuffer *buf; - GST_DEBUG_OBJECT (basesink, "dropping buffer"); + GST_DEBUG_OBJECT (basesink, "dropping buffer, out of segment"); /* take the buffer off the queue again */ buf = GST_BUFFER (g_queue_pop_tail (basesink->preroll_queue)); @@ -765,12 +708,19 @@ dropping: return GST_FLOW_OK; } +in_flushing: + { + gst_mini_object_unref (obj); + GST_PAD_PREROLL_UNLOCK (pad); + GST_DEBUG_OBJECT (basesink, "sink is flushing"); + + return GST_FLOW_WRONG_STATE; + } flushing: { - GST_OBJECT_UNLOCK (pad); gst_base_sink_preroll_queue_flush (basesink, pad); GST_PAD_PREROLL_UNLOCK (pad); - GST_DEBUG_OBJECT (basesink, "pad is flushing"); + GST_DEBUG_OBJECT (basesink, "sink is flushing"); return GST_FLOW_WRONG_STATE; } @@ -825,58 +775,86 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) } case GST_EVENT_NEWSEGMENT: { - GstFlowReturn ret; + /* the new segment is a non prerollable item and does not block anything, + * we can just configure the current segment and return. */ + /* FIXME, if the preroll-queue-len > 1 we need to queue the newsegment + * and activate them before attempting the synchronize */ + gboolean update; + gdouble rate; + GstFormat format; + gint64 start; + gint64 stop; + gint64 time; - ret = - gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT (event)); + /* the newsegment event is needed to bring the buffer timestamps to the + * stream time and to drop samples outside of the playback segment. */ + gst_event_parse_new_segment (event, &update, &rate, &format, + &start, &stop, &time); + + basesink->have_newsegment = TRUE; + + GST_OBJECT_LOCK (basesink); + gst_segment_set_newsegment (&basesink->segment, update, rate, format, + start, stop, time); + + GST_DEBUG_OBJECT (basesink, + "received NEWSEGMENT %" GST_TIME_FORMAT " -- %" + GST_TIME_FORMAT ", time %" GST_TIME_FORMAT ", accum %" + GST_TIME_FORMAT, + GST_TIME_ARGS (basesink->segment.start), + GST_TIME_ARGS (basesink->segment.stop), + GST_TIME_ARGS (basesink->segment.time), + GST_TIME_ARGS (basesink->segment.accum)); + GST_OBJECT_UNLOCK (basesink); + + gst_event_unref (event); break; } case GST_EVENT_FLUSH_START: - /* make sure we are not blocked on the clock also clear any pending - * eos state. */ if (bclass->event) bclass->event (basesink, event); - GST_OBJECT_LOCK (basesink); - basesink->flushing = TRUE; - if (basesink->clock_id) { - gst_clock_id_unschedule (basesink->clock_id); - } - GST_OBJECT_UNLOCK (basesink); - - GST_PAD_PREROLL_LOCK (pad); - /* we need preroll after the flush */ - GST_DEBUG_OBJECT (basesink, "flushing, need preroll after flush"); - basesink->need_preroll = TRUE; - /* unlock from a possible state change/preroll */ - gst_base_sink_preroll_queue_flush (basesink, pad); - GST_PAD_PREROLL_UNLOCK (pad); + /* make sure we are not blocked on the clock also clear any pending + * eos state. */ + gst_base_sink_set_flushing (basesink, pad, TRUE); + /* we grab the stream lock but that is not needed since setting the + * sink to flushing would make sure no state commit is being done + * anymore */ + GST_PAD_STREAM_LOCK (pad); /* and we need to commit our state again on the next * prerolled buffer */ - GST_PAD_STREAM_LOCK (pad); gst_element_lost_state (GST_ELEMENT (basesink)); - GST_PAD_STREAM_UNLOCK (pad); GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event); + GST_PAD_STREAM_UNLOCK (pad); + gst_event_unref (event); break; case GST_EVENT_FLUSH_STOP: if (bclass->event) bclass->event (basesink, event); - /* now we are completely unblocked and the _chain method - * will return */ - GST_OBJECT_LOCK (basesink); - basesink->flushing = FALSE; - GST_OBJECT_UNLOCK (basesink); + /* unset flushing so we can accept new data */ + gst_base_sink_set_flushing (basesink, pad, FALSE); + /* we need new segment info after the flush. */ gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); + basesink->have_newsegment = FALSE; GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event); gst_event_unref (event); break; default: - gst_event_unref (event); + /* other events are sent to queue or subclass depending on if they + * are serialized. */ + if (GST_EVENT_IS_SERIALIZED (event)) { + gst_base_sink_handle_object (basesink, pad, + GST_MINI_OBJECT_CAST (event)); + } else { + if (bclass->event) + bclass->event (basesink, event); + gst_event_unref (event); + } break; } gst_object_unref (basesink); @@ -905,34 +883,37 @@ gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer, } } -/* with STREAM_LOCK and LOCK */ +/* must be called with PREROLL_LOCK */ +static gboolean +gst_base_sink_is_prerolled (GstBaseSink * basesink) +{ + gboolean res; + + res = basesink->have_preroll || basesink->eos; + GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => prerolled: %d", + basesink->have_preroll, basesink->eos, res); + return res; +} + +/* with STREAM_LOCK, PREROLL_LOCK, id should be a valid GstClockID */ static GstClockReturn -gst_base_sink_wait (GstBaseSink * basesink, GstClockTime time) +gst_base_sink_wait (GstBaseSink * basesink, GstClockID id) { GstClockReturn ret; - GstClockID id; - - /* no need to attempt a clock wait if we are flushing */ - if (basesink->flushing) { - return GST_CLOCK_UNSCHEDULED; - } - - /* clock_id should be NULL outside of this function */ - g_assert (basesink->clock_id == NULL); - g_assert (GST_CLOCK_TIME_IS_VALID (time)); - - id = gst_clock_new_single_shot_id (GST_ELEMENT_CLOCK (basesink), time); basesink->clock_id = id; - /* release the object lock while waiting */ - GST_OBJECT_UNLOCK (basesink); + /* release the preroll lock while waiting */ + GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); ret = gst_clock_id_wait (id, NULL); - GST_OBJECT_LOCK (basesink); + GST_PAD_PREROLL_LOCK (basesink->sinkpad); gst_clock_id_unref (id); basesink->clock_id = NULL; + if (basesink->flushing) + ret = GST_CLOCK_UNSCHEDULED; + return ret; } @@ -944,6 +925,8 @@ gst_base_sink_wait (GstBaseSink * basesink, GstClockTime time) * the clock, save the entry so we can unlock it * 4) wait on the clock, this blocks * 5) unref the clockid again + * + * Is called with STREAM_LOCK */ static GstClockReturn gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer) @@ -953,6 +936,8 @@ gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer) gint64 cstart, cend; GstBaseSinkClass *bclass; GstClockTime base_time; + GstClock *clock; + GstClockID id; GstClockTimeDiff stream_start, stream_end; bclass = GST_BASE_SINK_GET_CLASS (basesink); @@ -981,36 +966,36 @@ gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer) } /* save last valid times seen. */ - if (GST_CLOCK_TIME_IS_VALID (end)) + if (GST_CLOCK_TIME_IS_VALID (cend)) gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, - (gint64) end); + (gint64) cend); else gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, - (gint64) start); + (gint64) cstart); } else { /* no clipping for formats different from GST_FORMAT_TIME */ cstart = start; cend = end; } - GST_OBJECT_LOCK (basesink); - if (!basesink->sync) - goto no_sync; - - if (GST_ELEMENT_CLOCK (basesink) == NULL) - goto no_clock; - if (!((basesink->segment.format == GST_FORMAT_TIME) || (basesink->segment.accum == 0))) goto no_segment; + GST_OBJECT_LOCK (basesink); + if (!basesink->sync) + goto no_sync; + + if ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL) + goto no_clock; + /* now do clocking, LOCK is helt */ stream_start = gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cstart); - stream_end = - gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cend); base_time = GST_ELEMENT_CAST (basesink)->base_time; + id = gst_clock_new_single_shot_id (clock, stream_start + base_time); + GST_OBJECT_UNLOCK (basesink); GST_LOG_OBJECT (basesink, "waiting for clock, base time %" GST_TIME_FORMAT @@ -1019,14 +1004,15 @@ gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer) /* also save end_time of this buffer so that we can wait * to signal EOS */ + stream_end = + gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cend); + if (GST_CLOCK_TIME_IS_VALID (stream_end)) basesink->end_time = stream_end + base_time; else basesink->end_time = GST_CLOCK_TIME_NONE; - result = gst_base_sink_wait (basesink, stream_start + base_time); - - GST_OBJECT_UNLOCK (basesink); + result = gst_base_sink_wait (basesink, id); GST_LOG_OBJECT (basesink, "clock entry done: %d", result); @@ -1041,7 +1027,8 @@ invalid_start: out_of_segment: { GST_LOG_OBJECT (basesink, "buffer skipped, not in segment"); - return GST_CLOCK_UNSCHEDULED; + /* we return BADTIME to make callers drop this buffer. */ + return GST_CLOCK_BADTIME; } no_sync: { @@ -1058,7 +1045,6 @@ no_clock: no_segment: { GST_DEBUG_OBJECT (basesink, "no segment info, can't sync"); - GST_OBJECT_UNLOCK (basesink); return GST_CLOCK_OK; } } @@ -1068,6 +1054,8 @@ no_segment: * * 2) render the event * 3) unref the event + * + * called with STREAM_LOCK, PREROLL_LOCK */ static inline gboolean gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event) @@ -1077,16 +1065,30 @@ gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: + { + GstClock *clock; + gboolean unlock = TRUE; + GST_OBJECT_LOCK (basesink); - if (GST_ELEMENT_CLOCK (basesink)) { + basesink->eos = TRUE; + basesink->eos_queued = FALSE; + if ((clock = GST_ELEMENT_CLOCK (basesink)) != NULL) { /* wait for last buffer to finish if we have a valid end time */ if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) { - gst_base_sink_wait (basesink, basesink->end_time); + GstClockID id; + + id = gst_clock_new_single_shot_id (clock, basesink->end_time); + unlock = FALSE; + GST_OBJECT_UNLOCK (basesink); + + gst_base_sink_wait (basesink, id); basesink->end_time = GST_CLOCK_TIME_NONE; } } - GST_OBJECT_UNLOCK (basesink); + if (unlock) + GST_OBJECT_UNLOCK (basesink); break; + } default: break; } @@ -1099,16 +1101,13 @@ gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: - GST_PAD_PREROLL_LOCK (basesink->sinkpad); /* if we are still EOS, we can post the EOS message */ if (basesink->eos) { /* ok, now we can post the message */ GST_DEBUG_OBJECT (basesink, "Now posting EOS"); gst_element_post_message (GST_ELEMENT_CAST (basesink), gst_message_new_eos (GST_OBJECT_CAST (basesink))); - basesink->eos_queued = FALSE; } - GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; default: break; @@ -1125,6 +1124,8 @@ gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event) * 1) first sync on the buffer * 2) render the buffer * 3) unref the buffer + * + * called with STREAM_LOCK, PREROLL_LOCK */ static inline GstFlowReturn gst_base_sink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf) @@ -1146,6 +1147,10 @@ gst_base_sink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf) ret = bclass->render (basesink, buf); break; } + case GST_CLOCK_BADTIME: + /* when out of segment, should normally not happen */ + case GST_CLOCK_UNSCHEDULED: + /* unlocked by a state change */ default: GST_DEBUG_OBJECT (basesink, "clock returned %d, not rendering", status); break; @@ -1223,33 +1228,43 @@ paused: } } +static gboolean +gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad, + gboolean flushing) +{ + GST_PAD_PREROLL_LOCK (pad); + basesink->flushing = flushing; + if (flushing) { + GstBaseSinkClass *bclass; + + bclass = GST_BASE_SINK_GET_CLASS (basesink); + + /* step 1, unblock clock sync (if any) or any other blocking thing */ + basesink->need_preroll = TRUE; + if (basesink->clock_id) { + gst_clock_id_unschedule (basesink->clock_id); + } + + /* unlock any subclasses */ + if (bclass->unlock) + bclass->unlock (basesink); + + /* flush out the data thread if it's locked in finish_preroll */ + GST_DEBUG_OBJECT (basesink, + "flushing out data thread, need preroll to TRUE"); + gst_base_sink_preroll_queue_flush (basesink, pad); + } + GST_PAD_PREROLL_UNLOCK (pad); + + return TRUE; +} + static gboolean gst_base_sink_deactivate (GstBaseSink * basesink, GstPad * pad) { - gboolean result = FALSE; - GstBaseSinkClass *bclass; + gboolean result; - bclass = GST_BASE_SINK_GET_CLASS (basesink); - - /* step 1, unblock clock sync (if any) or any other blocking thing */ - GST_PAD_PREROLL_LOCK (pad); - GST_OBJECT_LOCK (basesink); - if (basesink->clock_id) { - gst_clock_id_unschedule (basesink->clock_id); - } - GST_OBJECT_UNLOCK (basesink); - - /* unlock any subclasses */ - if (bclass->unlock) - bclass->unlock (basesink); - - /* flush out the data thread if it's locked in finish_preroll */ - GST_DEBUG_OBJECT (basesink, - "flushing out data thread, need preroll to FALSE"); - basesink->need_preroll = FALSE; - gst_base_sink_preroll_queue_flush (basesink, pad); - GST_PAD_PREROLL_SIGNAL (pad); - GST_PAD_PREROLL_UNLOCK (pad); + gst_base_sink_set_flushing (basesink, pad, TRUE); /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (pad); @@ -1267,6 +1282,8 @@ gst_base_sink_activate (GstPad * pad) GST_DEBUG_OBJECT (basesink, "Trying pull mode first"); + gst_base_sink_set_flushing (basesink, pad, FALSE); + if (basesink->can_activate_pull && gst_pad_check_pull_range (pad) && gst_pad_activate_pull (pad, TRUE)) { GST_DEBUG_OBJECT (basesink, "Success activating pull mode"); @@ -1281,6 +1298,7 @@ gst_base_sink_activate (GstPad * pad) if (!result) { GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode"); + gst_base_sink_set_flushing (basesink, pad, TRUE); } gst_object_unref (basesink); @@ -1341,8 +1359,11 @@ gst_base_sink_activate_pull (GstPad * pad, gboolean active) basesink->pad_mode = GST_ACTIVATE_NONE; } else { if (gst_pad_activate_pull (peer, TRUE)) { - basesink->have_newsegment = TRUE; + /* we mark we have a newsegment here because pull based + * mode works just fine without having a newsegment before the + * first buffer */ gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); + basesink->have_newsegment = TRUE; /* set the pad mode before starting the task so that it's in the correct state for the new thread... */ @@ -1366,7 +1387,6 @@ gst_base_sink_activate_pull (GstPad * pad, gboolean active) g_warning ("Internal GStreamer activation error!!!"); result = FALSE; } else { - basesink->have_newsegment = FALSE; result = gst_base_sink_deactivate (basesink, pad); basesink->pad_mode = GST_ACTIVATE_NONE; } @@ -1419,55 +1439,65 @@ gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format, switch (format) { case GST_FORMAT_TIME: { + GstClockTime now, base; + gint64 time, accum; + gdouble abs_rate; + /* we can answer time format */ GST_OBJECT_LOCK (basesink); + /* can only give answer if not EOS */ + if (G_UNLIKELY (basesink->eos)) + goto wrong_state; + /* We get position from clock only in PLAYING */ - if (GST_STATE (basesink) != GST_STATE_PLAYING) { - GST_OBJECT_UNLOCK (basesink); - goto beach; - } + if (GST_STATE (basesink) != GST_STATE_PLAYING) + goto wrong_state; - if ((clock = GST_ELEMENT_CLOCK (basesink))) { - GstClockTime now, base; - gint64 time; + /* and we need a clock */ + if ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL) + goto wrong_state; - gst_object_ref (clock); - GST_OBJECT_UNLOCK (basesink); + /* collect all data we need holding the lock */ + if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time)) + time = basesink->segment.time; + else + time = 0; - now = gst_clock_get_time (clock); + base = GST_ELEMENT_CAST (basesink)->base_time; + accum = basesink->segment.accum; + abs_rate = basesink->segment.abs_rate; - GST_OBJECT_LOCK (basesink); - if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time)) - time = basesink->segment.time; - else - time = 0; - - base = GST_ELEMENT_CAST (basesink)->base_time; - base += basesink->segment.accum; - base = MIN (now, base); - *cur = - gst_guint64_to_gdouble (now - base) * basesink->segment.abs_rate + - time; - - GST_DEBUG_OBJECT (basesink, - "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %" - GST_TIME_FORMAT " + time %" GST_TIME_FORMAT " = %" GST_TIME_FORMAT, - GST_TIME_ARGS (now), GST_TIME_ARGS (base), - GST_TIME_ARGS (basesink->segment.accum), GST_TIME_ARGS (time), - GST_TIME_ARGS (*cur)); - - gst_object_unref (clock); - - res = TRUE; - } + gst_object_ref (clock); + /* need to release the object lock before we can get the time */ GST_OBJECT_UNLOCK (basesink); + + now = gst_clock_get_time (clock); + base += accum; + base = MIN (now, base); + *cur = gst_guint64_to_gdouble (now - base) * abs_rate + time; + + GST_DEBUG_OBJECT (basesink, + "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %" + GST_TIME_FORMAT " + time %" GST_TIME_FORMAT " = %" GST_TIME_FORMAT, + GST_TIME_ARGS (now), GST_TIME_ARGS (base), + GST_TIME_ARGS (accum), GST_TIME_ARGS (time), GST_TIME_ARGS (*cur)); + + gst_object_unref (clock); + + res = TRUE; } default: + /* cannot answer other than TIME */ break; } -beach: return res; + +wrong_state: + { + GST_OBJECT_UNLOCK (basesink); + return FALSE; + } } static gboolean @@ -1482,24 +1512,17 @@ gst_base_sink_query (GstElement * element, GstQuery * query) { gint64 cur = 0; GstFormat format; - gboolean eos; - GST_PAD_PREROLL_LOCK (basesink->sinkpad); - eos = basesink->eos; - GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); + gst_query_parse_position (query, &format, NULL); - if (eos) { - res = gst_base_sink_peer_query (basesink, query); + GST_DEBUG_OBJECT (basesink, "current position format %d", format); + + /* first try to get the position based on the clock */ + if ((res = gst_base_sink_get_position (basesink, format, &cur))) { + gst_query_set_position (query, format, cur); } else { - gst_query_parse_position (query, &format, NULL); - - GST_DEBUG_OBJECT (basesink, "current position format %d", format); - - if ((res = gst_base_sink_get_position (basesink, format, &cur))) { - gst_query_set_position (query, format, cur); - } else { - res = gst_base_sink_peer_query (basesink, query); - } + /* fallback to peer query */ + res = gst_base_sink_peer_query (basesink, query); } break; } @@ -1550,50 +1573,31 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) /* need to complete preroll before this state change completes, there * is no data flow in READY so we can safely assume we need to preroll. */ basesink->offset = 0; - GST_PAD_PREROLL_LOCK (basesink->sinkpad); + GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll"); basesink->have_preroll = FALSE; - GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll to FALSE"); basesink->need_preroll = TRUE; - GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); basesink->have_newsegment = FALSE; ret = GST_STATE_CHANGE_ASYNC; break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING"); GST_PAD_PREROLL_LOCK (basesink->sinkpad); - /* no preroll needed */ - basesink->need_preroll = FALSE; - - /* if we have EOS, we should empty the queue now as there will - * be no more data received in the chain function. - * FIXME, this could block the state change function too long when - * we are pushing and syncing the buffers, better start a new - * thread to do this. */ - if (basesink->eos) { - gboolean do_eos = !basesink->eos_queued; - - gst_base_sink_preroll_queue_empty (basesink, basesink->sinkpad); - - /* need to post EOS message here if it was not in the preroll queue we - * just emptied. */ - if (do_eos) { + if (!gst_base_sink_is_prerolled (basesink)) { + ret = GST_STATE_CHANGE_ASYNC; + basesink->need_preroll = TRUE; + } else { + /* no preroll needed anymore now. */ + basesink->need_preroll = FALSE; + if (basesink->eos) { + /* need to post EOS message here */ GST_DEBUG_OBJECT (basesink, "Now posting EOS"); gst_element_post_message (GST_ELEMENT_CAST (basesink), gst_message_new_eos (GST_OBJECT_CAST (basesink))); } - } else if (!basesink->have_preroll) { - /* queue a commit_state */ - basesink->need_preroll = TRUE; - GST_DEBUG_OBJECT (basesink, - "PAUSED to PLAYING, !eos, !have_preroll, need preroll to TRUE"); - ret = GST_STATE_CHANGE_ASYNC; - /* we know it's not waiting, no need to signal */ - } else { - GST_DEBUG_OBJECT (basesink, - "PAUSED to PLAYING, !eos, have_preroll, need preroll to FALSE"); - /* now let it play */ - GST_PAD_PREROLL_SIGNAL (basesink->sinkpad); } + GST_DEBUG_OBJECT (basesink, "signal preroll"); + GST_PAD_PREROLL_SIGNAL (basesink->sinkpad); GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; default: @@ -1610,36 +1614,24 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - { - GstBaseSinkClass *bclass; - - bclass = GST_BASE_SINK_GET_CLASS (basesink); - GST_PAD_PREROLL_LOCK (basesink->sinkpad); - GST_OBJECT_LOCK (basesink); - /* unlock clock wait if any */ + basesink->need_preroll = TRUE; if (basesink->clock_id) { gst_clock_id_unschedule (basesink->clock_id); } - GST_OBJECT_UNLOCK (basesink); - /* unlock any subclasses */ if (bclass->unlock) bclass->unlock (basesink); - /* if we don't have a preroll buffer and we have not received EOS, - * we need to wait for a preroll */ - GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d", - basesink->have_preroll, basesink->eos); - if (!basesink->have_preroll && !basesink->eos - && GST_STATE_PENDING (basesink) == GST_STATE_PAUSED) { - GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll to TRUE"); - basesink->need_preroll = TRUE; + GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, waiting for lock"); + /* if we don't have a preroll buffer we need to wait for a preroll and + * return ASYNC. */ + if (!gst_base_sink_is_prerolled (basesink)) { + GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll"); ret = GST_STATE_CHANGE_ASYNC; } GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; - } case GST_STATE_CHANGE_PAUSED_TO_READY: break; case GST_STATE_CHANGE_READY_TO_NULL: