From 0cab8b4fbc8ca8fe824322ee90a29fcf656c4eeb Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 30 Jan 2006 16:07:48 +0000 Subject: [PATCH] libs/gst/base/gstbasesink.c: Basesink cleanups, remove some old code. Original commit message from CVS: * libs/gst/base/gstbasesink.c: (gst_base_sink_init), (gst_base_sink_preroll_queue_empty), (gst_base_sink_commit_state), (gst_base_sink_handle_object), (gst_base_sink_event), (gst_base_sink_is_prerolled), (gst_base_sink_wait), (gst_base_sink_do_sync), (gst_base_sink_handle_event), (gst_base_sink_handle_buffer), (gst_base_sink_set_flushing), (gst_base_sink_deactivate), (gst_base_sink_activate), (gst_base_sink_activate_pull), (gst_base_sink_get_position), (gst_base_sink_query), (gst_base_sink_change_state): Basesink cleanups, remove some old code. Handle the case where a subclass can preroll in the render method (mostly audiosinks). Handle more events. Remove some locks around variables that are now protected with the PREROLL_LOCK (clock_id, flushing, ..). Optimize position query some more, do correct locking. Remove old code to push queue in state change, this is not needed anymore since preroll blocks on all prerollable items now. Almost implemented as described in design doc. --- ChangeLog | 23 ++ libs/gst/base/gstbasesink.c | 750 ++++++++++++++++++------------------ 2 files changed, 394 insertions(+), 379 deletions(-) diff --git a/ChangeLog b/ChangeLog index 539175ac26..6487b08948 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,26 @@ +2006-01-30 Wim Taymans + + * libs/gst/base/gstbasesink.c: (gst_base_sink_init), + (gst_base_sink_preroll_queue_empty), (gst_base_sink_commit_state), + (gst_base_sink_handle_object), (gst_base_sink_event), + (gst_base_sink_is_prerolled), (gst_base_sink_wait), + (gst_base_sink_do_sync), (gst_base_sink_handle_event), + (gst_base_sink_handle_buffer), (gst_base_sink_set_flushing), + (gst_base_sink_deactivate), (gst_base_sink_activate), + (gst_base_sink_activate_pull), (gst_base_sink_get_position), + (gst_base_sink_query), (gst_base_sink_change_state): + Basesink cleanups, remove some old code. + Handle the case where a subclass can preroll in the render + method (mostly audiosinks). + Handle more events. + Remove some locks around variables that are now protected + with the PREROLL_LOCK (clock_id, flushing, ..). + Optimize position query some more, do correct locking. + Remove old code to push queue in state change, this is not + needed anymore since preroll blocks on all prerollable items + now. + Almost implemented as described in design doc. + 2006-01-30 Wim Taymans * tests/check/gst/gstbin.c: (GST_START_TEST): diff --git a/libs/gst/base/gstbasesink.c b/libs/gst/base/gstbasesink.c index 78b309f35e..dfd14a8ab8 100644 --- a/libs/gst/base/gstbasesink.c +++ b/libs/gst/base/gstbasesink.c @@ -1,5 +1,5 @@ /* GStreamer - * Copyright (C) 2005 Wim Taymans + * Copyright (C) 2005,2006 Wim Taymans * * gstbasesink.c: Base class for sink elements * @@ -65,10 +65,16 @@ * After synchronisation the virtual method #GstBaseSink::render will be called. * Subclasses should minimally implement this method. * - * Upon receiving the EOS event in th PLAYING state, #GstBaseSink will wait for + * Since 0.10.3 subclasses that synchronize on the clock in the ::render method + * are supported as well. These classes typically receive a buffer in the render + * method and can then potentially block on the clock while rendering. A typical + * example would be an audiosink. + * + * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait for * the clock to reach the time indicated by the stop time of the last ::get_times * call before posting an EOS message. When the element receives EOS in PAUSED, - * the event is queued and an EOS message is posted when going to PLAYING. + * preroll completes, the event is queued and an EOS message is posted when going + * to PLAYING. * * #GstBaseSink will internally use the GST_EVENT_NEW_SEGMENT events to schedule * synchronisation and clipping of buffers. Buffers that fall completely outside @@ -78,7 +84,8 @@ * * #GstBaseSink will by default report the current playback position in * GST_FORMAT_TIME based on the current clock time and segment information. - * If the element is EOS however, the query will be forwarded upstream. + * If the element is EOS, PAUSED or no clock has been set on the element, the + * query will be forwarded upstream. * * The ::set_caps function will be called when the subclass should configure itself * to precess a specific media type. @@ -99,7 +106,7 @@ * operations they perform in the ::render method. This is mostly usefull when * the ::render method performs a blocking write on a file descripter. * - * Last reviewed on 2006-01-18 (0.10.0) + * Last reviewed on 2006-01-30 (0.10.3) */ #ifdef HAVE_CONFIG_H @@ -180,6 +187,8 @@ static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf); static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer, GstClockTime * start, GstClockTime * end); +static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink, + GstPad * pad, gboolean flushing); static GstStateChangeReturn gst_base_sink_change_state (GstElement * element, GstStateChange transition); @@ -335,7 +344,6 @@ gst_base_sink_init (GstBaseSink * basesink, gpointer g_class) gst_element_add_pad (GST_ELEMENT (basesink), basesink->sinkpad); basesink->pad_mode = GST_ACTIVATE_NONE; - GST_PAD_TASK (basesink->sinkpad) = NULL; basesink->preroll_queue = g_queue_new (); basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH; @@ -425,7 +433,7 @@ gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size, return GST_FLOW_OK; } -/* with PREROLL_LOCK */ +/* with PREROLL_LOCK, STREAM_LOCK */ static GstFlowReturn gst_base_sink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad) { @@ -438,44 +446,25 @@ gst_base_sink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad) if (q) { GST_DEBUG_OBJECT (basesink, "emptying queue"); while ((obj = g_queue_pop_head (q))) { - gboolean is_buffer; + basesink->preroll_queued--; - is_buffer = GST_IS_BUFFER (obj); - if (G_LIKELY (is_buffer)) { - basesink->preroll_queued--; + if (G_LIKELY (GST_IS_BUFFER (obj))) { basesink->buffers_queued--; - } else { - switch (GST_EVENT_TYPE (obj)) { - case GST_EVENT_EOS: - basesink->preroll_queued--; - break; - default: - break; - } - basesink->events_queued--; - } - /* we release the preroll lock while pushing so that we - * can still flush it while blocking on the clock or - * inside the element. */ - GST_PAD_PREROLL_UNLOCK (pad); - - if (G_LIKELY (is_buffer)) { GST_DEBUG_OBJECT (basesink, "popped buffer %p", obj); ret = gst_base_sink_handle_buffer (basesink, GST_BUFFER_CAST (obj)); } else { + basesink->events_queued--; GST_DEBUG_OBJECT (basesink, "popped event %p", obj); gst_base_sink_handle_event (basesink, GST_EVENT_CAST (obj)); ret = GST_FLOW_OK; } - - GST_PAD_PREROLL_LOCK (pad); } GST_DEBUG_OBJECT (basesink, "queue empty"); } return ret; } -/* with PREROLL_LOCK */ +/* with PREROLL_LOCK, STREAM_LOCK */ static void gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad) { @@ -500,74 +489,80 @@ gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad) GST_PAD_PREROLL_SIGNAL (pad); } -/* with PREROLL_LOCK */ +/* with PREROLL_LOCK, STREAM_LOCK */ static gboolean gst_base_sink_commit_state (GstBaseSink * basesink) { /* commit state and proceed to next pending state */ - { - GstState current, next, pending, post_pending; - GstMessage *message; - gboolean post_paused = FALSE; - gboolean post_playing = FALSE; + GstState current, next, pending, post_pending; + GstMessage *message; + gboolean post_paused = FALSE; + gboolean post_playing = FALSE; - GST_OBJECT_LOCK (basesink); - current = GST_STATE (basesink); - next = GST_STATE_NEXT (basesink); - pending = GST_STATE_PENDING (basesink); - post_pending = pending; + GST_OBJECT_LOCK (basesink); + current = GST_STATE (basesink); + next = GST_STATE_NEXT (basesink); + pending = GST_STATE_PENDING (basesink); + post_pending = pending; - switch (pending) { - case GST_STATE_PLAYING: - basesink->need_preroll = FALSE; - post_playing = TRUE; - /* post PAUSED too when we were READY */ - if (current == GST_STATE_READY) { - post_paused = TRUE; - } - break; - case GST_STATE_PAUSED: - basesink->need_preroll = TRUE; + switch (pending) { + case GST_STATE_PLAYING: + basesink->need_preroll = FALSE; + post_playing = TRUE; + /* post PAUSED too when we were READY */ + if (current == GST_STATE_READY) { post_paused = TRUE; - post_pending = GST_STATE_VOID_PENDING; - break; - case GST_STATE_READY: - goto stopping; - default: - break; - } - - if (pending != GST_STATE_VOID_PENDING) { - GST_STATE (basesink) = pending; - GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING; - GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING; - GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS; - } - GST_OBJECT_UNLOCK (basesink); - - if (post_paused) { - message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink), - current, next, post_pending); - gst_element_post_message (GST_ELEMENT_CAST (basesink), message); - } - if (post_playing) { - message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink), - next, pending, GST_STATE_VOID_PENDING); - gst_element_post_message (GST_ELEMENT_CAST (basesink), message); - } - /* and mark dirty */ - if (post_paused || post_playing) { - gst_element_post_message (GST_ELEMENT_CAST (basesink), - gst_message_new_state_dirty (GST_OBJECT_CAST (basesink))); - } - - GST_STATE_BROADCAST (basesink); + } + break; + case GST_STATE_PAUSED: + basesink->need_preroll = TRUE; + post_paused = TRUE; + post_pending = GST_STATE_VOID_PENDING; + break; + case GST_STATE_READY: + case GST_STATE_NULL: + goto stopping; + case GST_STATE_VOID_PENDING: + goto nothing_pending; + default: + break; } + + GST_STATE (basesink) = pending; + GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING; + GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING; + GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS; + GST_OBJECT_UNLOCK (basesink); + + if (post_paused) { + message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink), + current, next, post_pending); + gst_element_post_message (GST_ELEMENT_CAST (basesink), message); + } + if (post_playing) { + message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink), + next, pending, GST_STATE_VOID_PENDING); + gst_element_post_message (GST_ELEMENT_CAST (basesink), message); + } + /* and mark dirty */ + if (post_paused || post_playing) { + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_state_dirty (GST_OBJECT_CAST (basesink))); + } + + GST_STATE_BROADCAST (basesink); + return TRUE; +nothing_pending: + { + GST_OBJECT_UNLOCK (basesink); + return TRUE; + } stopping: { /* app is going to READY */ + basesink->need_preroll = FALSE; GST_OBJECT_UNLOCK (basesink); return FALSE; } @@ -579,59 +574,21 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad, GstMiniObject * obj) { gint length; - gboolean have_event; + gboolean have_buffer; GstFlowReturn ret; GST_PAD_PREROLL_LOCK (pad); + if (basesink->flushing) + goto in_flushing; + /* push object on the queue */ GST_DEBUG_OBJECT (basesink, "push %p on preroll_queue", obj); g_queue_push_tail (basesink->preroll_queue, obj); - have_event = GST_IS_EVENT (obj); - if (have_event) { - GstEvent *event = GST_EVENT (obj); - - switch (GST_EVENT_TYPE (obj)) { - case GST_EVENT_EOS: - basesink->preroll_queued++; - basesink->eos = TRUE; - basesink->eos_queued = TRUE; - break; - case GST_EVENT_NEWSEGMENT: - { - gboolean update; - gdouble rate; - GstFormat format; - gint64 start; - gint64 stop; - gint64 time; - - /* the newsegment event is needed to bring the buffer timestamps to the - * stream time and to drop samples outside of the playback segment. */ - gst_event_parse_new_segment (event, &update, &rate, &format, - &start, &stop, &time); - - basesink->have_newsegment = TRUE; - - gst_segment_set_newsegment (&basesink->segment, update, rate, format, - start, stop, time); - - GST_DEBUG_OBJECT (basesink, - "received NEWSEGMENT %" GST_TIME_FORMAT " -- %" - GST_TIME_FORMAT ", time %" GST_TIME_FORMAT ", accum %" - GST_TIME_FORMAT, - GST_TIME_ARGS (basesink->segment.start), - GST_TIME_ARGS (basesink->segment.stop), - GST_TIME_ARGS (basesink->segment.time), - GST_TIME_ARGS (basesink->segment.accum)); - break; - } - default: - break; - } - basesink->events_queued++; - } else { - GstBuffer *buf = GST_BUFFER (obj); + have_buffer = GST_IS_BUFFER (obj); + if (G_LIKELY (have_buffer)) { + GstBuffer *buf = GST_BUFFER_CAST (obj); + GstClockTime start = -1, end = -1; if (!basesink->have_newsegment) { GST_ELEMENT_WARNING (basesink, STREAM, FAILED, @@ -644,26 +601,34 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad, } /* check if the buffer needs to be dropped */ - if (TRUE) { - GstClockTime start = -1, end = -1; + /* we don't use the subclassed method as it may not return + * valid values for our purpose here */ + gst_base_sink_get_times (basesink, buf, &start, &end); - /* we don't use the subclassed method as it may not return - * valid values for our purpose here */ - gst_base_sink_get_times (basesink, buf, &start, &end); + GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT + ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end)); - GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT - ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), - GST_TIME_ARGS (end)); - - if (GST_CLOCK_TIME_IS_VALID (start) && - (basesink->segment.format == GST_FORMAT_TIME)) { - if (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME, - (gint64) start, (gint64) end, NULL, NULL)) - goto dropping; - } + if (GST_CLOCK_TIME_IS_VALID (start) && + (basesink->segment.format == GST_FORMAT_TIME)) { + if (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME, + (gint64) start, (gint64) end, NULL, NULL)) + goto out_of_segment; } - basesink->preroll_queued++; basesink->buffers_queued++; + basesink->preroll_queued++; + } else { + GstEvent *event = GST_EVENT_CAST (obj); + + switch (GST_EVENT_TYPE (event)) { + /* only EOS finishes preroll */ + case GST_EVENT_EOS: + basesink->preroll_queued++; + basesink->eos_queued = TRUE; + break; + default: + break; + } + basesink->events_queued++; } GST_DEBUG_OBJECT (basesink, @@ -672,17 +637,19 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad, basesink->buffers_queued, basesink->events_queued); /* check if we are prerolling */ - if (!basesink->need_preroll) + if (G_LIKELY (!basesink->need_preroll)) goto no_preroll; - /* there is a buffer queued */ - if (basesink->buffers_queued == 1) { + length = basesink->preroll_queued; + GST_DEBUG_OBJECT (basesink, "prerolled length %d", length); + + if (length == 1) { GST_DEBUG_OBJECT (basesink, "do preroll %p", obj); /* if it's a buffer, we need to call the preroll method */ - if (GST_IS_BUFFER (obj)) { + if (G_LIKELY (have_buffer)) { GstBaseSinkClass *bclass; - GstBuffer *buf = GST_BUFFER (obj); + GstBuffer *buf = GST_BUFFER_CAST (obj); GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); @@ -692,71 +659,47 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad, if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK) goto preroll_failed; } - } - length = basesink->preroll_queued; - GST_DEBUG_OBJECT (basesink, "prerolled length %d", length); - - if (length == 1) { - - basesink->have_preroll = TRUE; /* commit state */ if (!gst_base_sink_commit_state (basesink)) goto stopping; - GST_OBJECT_LOCK (pad); - if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) - goto flushing; - GST_OBJECT_UNLOCK (pad); - /* it is possible that commiting the state made us go to PLAYING * now in which case we don't need to block anymore. */ - if (!basesink->need_preroll) + if (!basesink->need_preroll) { goto no_preroll; - - length = basesink->preroll_queued; - - /* FIXME: a pad probe could have made us lose the buffer, according - * to one of the python tests */ - if (length == 0) { - GST_ERROR_OBJECT (basesink, - "preroll_queued dropped from 1 to 0 while committing state change"); } - g_assert (length <= 1); } - /* see if we need to block now. We cannot block on events, only - * on buffers, the reason is that events can be sent from the - * application thread and we don't want to block there. */ - if (length <= basesink->preroll_queue_max_len || have_event) + /* see if we need to block now. */ + if (G_UNLIKELY (length <= basesink->preroll_queue_max_len)) goto more_preroll; /* block until the state changes, or we get a flush, or something */ GST_DEBUG_OBJECT (basesink, "waiting to finish preroll"); + basesink->have_preroll = TRUE; GST_PAD_PREROLL_WAIT (pad); + basesink->have_preroll = FALSE; GST_DEBUG_OBJECT (basesink, "done preroll"); - GST_OBJECT_LOCK (pad); - if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + if (G_UNLIKELY (basesink->flushing)) goto flushing; - GST_OBJECT_UNLOCK (pad); /* we can start rendering the data now */ no_preroll: GST_DEBUG_OBJECT (basesink, "no preroll needed"); - /* maybe it was another sink that blocked in preroll, need to check for - buffers to drain */ - basesink->have_preroll = FALSE; + /* render all our buffers now */ ret = gst_base_sink_preroll_queue_empty (basesink, pad); + GST_PAD_PREROLL_UNLOCK (pad); return ret; /* special cases */ -dropping: +out_of_segment: { GstBuffer *buf; - GST_DEBUG_OBJECT (basesink, "dropping buffer"); + GST_DEBUG_OBJECT (basesink, "dropping buffer, out of segment"); /* take the buffer off the queue again */ buf = GST_BUFFER (g_queue_pop_tail (basesink->preroll_queue)); @@ -765,12 +708,19 @@ dropping: return GST_FLOW_OK; } +in_flushing: + { + gst_mini_object_unref (obj); + GST_PAD_PREROLL_UNLOCK (pad); + GST_DEBUG_OBJECT (basesink, "sink is flushing"); + + return GST_FLOW_WRONG_STATE; + } flushing: { - GST_OBJECT_UNLOCK (pad); gst_base_sink_preroll_queue_flush (basesink, pad); GST_PAD_PREROLL_UNLOCK (pad); - GST_DEBUG_OBJECT (basesink, "pad is flushing"); + GST_DEBUG_OBJECT (basesink, "sink is flushing"); return GST_FLOW_WRONG_STATE; } @@ -825,58 +775,86 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) } case GST_EVENT_NEWSEGMENT: { - GstFlowReturn ret; + /* the new segment is a non prerollable item and does not block anything, + * we can just configure the current segment and return. */ + /* FIXME, if the preroll-queue-len > 1 we need to queue the newsegment + * and activate them before attempting the synchronize */ + gboolean update; + gdouble rate; + GstFormat format; + gint64 start; + gint64 stop; + gint64 time; - ret = - gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT (event)); + /* the newsegment event is needed to bring the buffer timestamps to the + * stream time and to drop samples outside of the playback segment. */ + gst_event_parse_new_segment (event, &update, &rate, &format, + &start, &stop, &time); + + basesink->have_newsegment = TRUE; + + GST_OBJECT_LOCK (basesink); + gst_segment_set_newsegment (&basesink->segment, update, rate, format, + start, stop, time); + + GST_DEBUG_OBJECT (basesink, + "received NEWSEGMENT %" GST_TIME_FORMAT " -- %" + GST_TIME_FORMAT ", time %" GST_TIME_FORMAT ", accum %" + GST_TIME_FORMAT, + GST_TIME_ARGS (basesink->segment.start), + GST_TIME_ARGS (basesink->segment.stop), + GST_TIME_ARGS (basesink->segment.time), + GST_TIME_ARGS (basesink->segment.accum)); + GST_OBJECT_UNLOCK (basesink); + + gst_event_unref (event); break; } case GST_EVENT_FLUSH_START: - /* make sure we are not blocked on the clock also clear any pending - * eos state. */ if (bclass->event) bclass->event (basesink, event); - GST_OBJECT_LOCK (basesink); - basesink->flushing = TRUE; - if (basesink->clock_id) { - gst_clock_id_unschedule (basesink->clock_id); - } - GST_OBJECT_UNLOCK (basesink); - - GST_PAD_PREROLL_LOCK (pad); - /* we need preroll after the flush */ - GST_DEBUG_OBJECT (basesink, "flushing, need preroll after flush"); - basesink->need_preroll = TRUE; - /* unlock from a possible state change/preroll */ - gst_base_sink_preroll_queue_flush (basesink, pad); - GST_PAD_PREROLL_UNLOCK (pad); + /* make sure we are not blocked on the clock also clear any pending + * eos state. */ + gst_base_sink_set_flushing (basesink, pad, TRUE); + /* we grab the stream lock but that is not needed since setting the + * sink to flushing would make sure no state commit is being done + * anymore */ + GST_PAD_STREAM_LOCK (pad); /* and we need to commit our state again on the next * prerolled buffer */ - GST_PAD_STREAM_LOCK (pad); gst_element_lost_state (GST_ELEMENT (basesink)); - GST_PAD_STREAM_UNLOCK (pad); GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event); + GST_PAD_STREAM_UNLOCK (pad); + gst_event_unref (event); break; case GST_EVENT_FLUSH_STOP: if (bclass->event) bclass->event (basesink, event); - /* now we are completely unblocked and the _chain method - * will return */ - GST_OBJECT_LOCK (basesink); - basesink->flushing = FALSE; - GST_OBJECT_UNLOCK (basesink); + /* unset flushing so we can accept new data */ + gst_base_sink_set_flushing (basesink, pad, FALSE); + /* we need new segment info after the flush. */ gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); + basesink->have_newsegment = FALSE; GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event); gst_event_unref (event); break; default: - gst_event_unref (event); + /* other events are sent to queue or subclass depending on if they + * are serialized. */ + if (GST_EVENT_IS_SERIALIZED (event)) { + gst_base_sink_handle_object (basesink, pad, + GST_MINI_OBJECT_CAST (event)); + } else { + if (bclass->event) + bclass->event (basesink, event); + gst_event_unref (event); + } break; } gst_object_unref (basesink); @@ -905,34 +883,37 @@ gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer, } } -/* with STREAM_LOCK and LOCK */ +/* must be called with PREROLL_LOCK */ +static gboolean +gst_base_sink_is_prerolled (GstBaseSink * basesink) +{ + gboolean res; + + res = basesink->have_preroll || basesink->eos; + GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => prerolled: %d", + basesink->have_preroll, basesink->eos, res); + return res; +} + +/* with STREAM_LOCK, PREROLL_LOCK, id should be a valid GstClockID */ static GstClockReturn -gst_base_sink_wait (GstBaseSink * basesink, GstClockTime time) +gst_base_sink_wait (GstBaseSink * basesink, GstClockID id) { GstClockReturn ret; - GstClockID id; - - /* no need to attempt a clock wait if we are flushing */ - if (basesink->flushing) { - return GST_CLOCK_UNSCHEDULED; - } - - /* clock_id should be NULL outside of this function */ - g_assert (basesink->clock_id == NULL); - g_assert (GST_CLOCK_TIME_IS_VALID (time)); - - id = gst_clock_new_single_shot_id (GST_ELEMENT_CLOCK (basesink), time); basesink->clock_id = id; - /* release the object lock while waiting */ - GST_OBJECT_UNLOCK (basesink); + /* release the preroll lock while waiting */ + GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); ret = gst_clock_id_wait (id, NULL); - GST_OBJECT_LOCK (basesink); + GST_PAD_PREROLL_LOCK (basesink->sinkpad); gst_clock_id_unref (id); basesink->clock_id = NULL; + if (basesink->flushing) + ret = GST_CLOCK_UNSCHEDULED; + return ret; } @@ -944,6 +925,8 @@ gst_base_sink_wait (GstBaseSink * basesink, GstClockTime time) * the clock, save the entry so we can unlock it * 4) wait on the clock, this blocks * 5) unref the clockid again + * + * Is called with STREAM_LOCK */ static GstClockReturn gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer) @@ -953,6 +936,8 @@ gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer) gint64 cstart, cend; GstBaseSinkClass *bclass; GstClockTime base_time; + GstClock *clock; + GstClockID id; GstClockTimeDiff stream_start, stream_end; bclass = GST_BASE_SINK_GET_CLASS (basesink); @@ -981,36 +966,36 @@ gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer) } /* save last valid times seen. */ - if (GST_CLOCK_TIME_IS_VALID (end)) + if (GST_CLOCK_TIME_IS_VALID (cend)) gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, - (gint64) end); + (gint64) cend); else gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, - (gint64) start); + (gint64) cstart); } else { /* no clipping for formats different from GST_FORMAT_TIME */ cstart = start; cend = end; } - GST_OBJECT_LOCK (basesink); - if (!basesink->sync) - goto no_sync; - - if (GST_ELEMENT_CLOCK (basesink) == NULL) - goto no_clock; - if (!((basesink->segment.format == GST_FORMAT_TIME) || (basesink->segment.accum == 0))) goto no_segment; + GST_OBJECT_LOCK (basesink); + if (!basesink->sync) + goto no_sync; + + if ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL) + goto no_clock; + /* now do clocking, LOCK is helt */ stream_start = gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cstart); - stream_end = - gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cend); base_time = GST_ELEMENT_CAST (basesink)->base_time; + id = gst_clock_new_single_shot_id (clock, stream_start + base_time); + GST_OBJECT_UNLOCK (basesink); GST_LOG_OBJECT (basesink, "waiting for clock, base time %" GST_TIME_FORMAT @@ -1019,14 +1004,15 @@ gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer) /* also save end_time of this buffer so that we can wait * to signal EOS */ + stream_end = + gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cend); + if (GST_CLOCK_TIME_IS_VALID (stream_end)) basesink->end_time = stream_end + base_time; else basesink->end_time = GST_CLOCK_TIME_NONE; - result = gst_base_sink_wait (basesink, stream_start + base_time); - - GST_OBJECT_UNLOCK (basesink); + result = gst_base_sink_wait (basesink, id); GST_LOG_OBJECT (basesink, "clock entry done: %d", result); @@ -1041,7 +1027,8 @@ invalid_start: out_of_segment: { GST_LOG_OBJECT (basesink, "buffer skipped, not in segment"); - return GST_CLOCK_UNSCHEDULED; + /* we return BADTIME to make callers drop this buffer. */ + return GST_CLOCK_BADTIME; } no_sync: { @@ -1058,7 +1045,6 @@ no_clock: no_segment: { GST_DEBUG_OBJECT (basesink, "no segment info, can't sync"); - GST_OBJECT_UNLOCK (basesink); return GST_CLOCK_OK; } } @@ -1068,6 +1054,8 @@ no_segment: * * 2) render the event * 3) unref the event + * + * called with STREAM_LOCK, PREROLL_LOCK */ static inline gboolean gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event) @@ -1077,16 +1065,30 @@ gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: + { + GstClock *clock; + gboolean unlock = TRUE; + GST_OBJECT_LOCK (basesink); - if (GST_ELEMENT_CLOCK (basesink)) { + basesink->eos = TRUE; + basesink->eos_queued = FALSE; + if ((clock = GST_ELEMENT_CLOCK (basesink)) != NULL) { /* wait for last buffer to finish if we have a valid end time */ if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) { - gst_base_sink_wait (basesink, basesink->end_time); + GstClockID id; + + id = gst_clock_new_single_shot_id (clock, basesink->end_time); + unlock = FALSE; + GST_OBJECT_UNLOCK (basesink); + + gst_base_sink_wait (basesink, id); basesink->end_time = GST_CLOCK_TIME_NONE; } } - GST_OBJECT_UNLOCK (basesink); + if (unlock) + GST_OBJECT_UNLOCK (basesink); break; + } default: break; } @@ -1099,16 +1101,13 @@ gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: - GST_PAD_PREROLL_LOCK (basesink->sinkpad); /* if we are still EOS, we can post the EOS message */ if (basesink->eos) { /* ok, now we can post the message */ GST_DEBUG_OBJECT (basesink, "Now posting EOS"); gst_element_post_message (GST_ELEMENT_CAST (basesink), gst_message_new_eos (GST_OBJECT_CAST (basesink))); - basesink->eos_queued = FALSE; } - GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; default: break; @@ -1125,6 +1124,8 @@ gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event) * 1) first sync on the buffer * 2) render the buffer * 3) unref the buffer + * + * called with STREAM_LOCK, PREROLL_LOCK */ static inline GstFlowReturn gst_base_sink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf) @@ -1146,6 +1147,10 @@ gst_base_sink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf) ret = bclass->render (basesink, buf); break; } + case GST_CLOCK_BADTIME: + /* when out of segment, should normally not happen */ + case GST_CLOCK_UNSCHEDULED: + /* unlocked by a state change */ default: GST_DEBUG_OBJECT (basesink, "clock returned %d, not rendering", status); break; @@ -1223,33 +1228,43 @@ paused: } } +static gboolean +gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad, + gboolean flushing) +{ + GST_PAD_PREROLL_LOCK (pad); + basesink->flushing = flushing; + if (flushing) { + GstBaseSinkClass *bclass; + + bclass = GST_BASE_SINK_GET_CLASS (basesink); + + /* step 1, unblock clock sync (if any) or any other blocking thing */ + basesink->need_preroll = TRUE; + if (basesink->clock_id) { + gst_clock_id_unschedule (basesink->clock_id); + } + + /* unlock any subclasses */ + if (bclass->unlock) + bclass->unlock (basesink); + + /* flush out the data thread if it's locked in finish_preroll */ + GST_DEBUG_OBJECT (basesink, + "flushing out data thread, need preroll to TRUE"); + gst_base_sink_preroll_queue_flush (basesink, pad); + } + GST_PAD_PREROLL_UNLOCK (pad); + + return TRUE; +} + static gboolean gst_base_sink_deactivate (GstBaseSink * basesink, GstPad * pad) { - gboolean result = FALSE; - GstBaseSinkClass *bclass; + gboolean result; - bclass = GST_BASE_SINK_GET_CLASS (basesink); - - /* step 1, unblock clock sync (if any) or any other blocking thing */ - GST_PAD_PREROLL_LOCK (pad); - GST_OBJECT_LOCK (basesink); - if (basesink->clock_id) { - gst_clock_id_unschedule (basesink->clock_id); - } - GST_OBJECT_UNLOCK (basesink); - - /* unlock any subclasses */ - if (bclass->unlock) - bclass->unlock (basesink); - - /* flush out the data thread if it's locked in finish_preroll */ - GST_DEBUG_OBJECT (basesink, - "flushing out data thread, need preroll to FALSE"); - basesink->need_preroll = FALSE; - gst_base_sink_preroll_queue_flush (basesink, pad); - GST_PAD_PREROLL_SIGNAL (pad); - GST_PAD_PREROLL_UNLOCK (pad); + gst_base_sink_set_flushing (basesink, pad, TRUE); /* step 2, make sure streaming finishes */ result = gst_pad_stop_task (pad); @@ -1267,6 +1282,8 @@ gst_base_sink_activate (GstPad * pad) GST_DEBUG_OBJECT (basesink, "Trying pull mode first"); + gst_base_sink_set_flushing (basesink, pad, FALSE); + if (basesink->can_activate_pull && gst_pad_check_pull_range (pad) && gst_pad_activate_pull (pad, TRUE)) { GST_DEBUG_OBJECT (basesink, "Success activating pull mode"); @@ -1281,6 +1298,7 @@ gst_base_sink_activate (GstPad * pad) if (!result) { GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode"); + gst_base_sink_set_flushing (basesink, pad, TRUE); } gst_object_unref (basesink); @@ -1341,8 +1359,11 @@ gst_base_sink_activate_pull (GstPad * pad, gboolean active) basesink->pad_mode = GST_ACTIVATE_NONE; } else { if (gst_pad_activate_pull (peer, TRUE)) { - basesink->have_newsegment = TRUE; + /* we mark we have a newsegment here because pull based + * mode works just fine without having a newsegment before the + * first buffer */ gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); + basesink->have_newsegment = TRUE; /* set the pad mode before starting the task so that it's in the correct state for the new thread... */ @@ -1366,7 +1387,6 @@ gst_base_sink_activate_pull (GstPad * pad, gboolean active) g_warning ("Internal GStreamer activation error!!!"); result = FALSE; } else { - basesink->have_newsegment = FALSE; result = gst_base_sink_deactivate (basesink, pad); basesink->pad_mode = GST_ACTIVATE_NONE; } @@ -1419,55 +1439,65 @@ gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format, switch (format) { case GST_FORMAT_TIME: { + GstClockTime now, base; + gint64 time, accum; + gdouble abs_rate; + /* we can answer time format */ GST_OBJECT_LOCK (basesink); + /* can only give answer if not EOS */ + if (G_UNLIKELY (basesink->eos)) + goto wrong_state; + /* We get position from clock only in PLAYING */ - if (GST_STATE (basesink) != GST_STATE_PLAYING) { - GST_OBJECT_UNLOCK (basesink); - goto beach; - } + if (GST_STATE (basesink) != GST_STATE_PLAYING) + goto wrong_state; - if ((clock = GST_ELEMENT_CLOCK (basesink))) { - GstClockTime now, base; - gint64 time; + /* and we need a clock */ + if ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL) + goto wrong_state; - gst_object_ref (clock); - GST_OBJECT_UNLOCK (basesink); + /* collect all data we need holding the lock */ + if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time)) + time = basesink->segment.time; + else + time = 0; - now = gst_clock_get_time (clock); + base = GST_ELEMENT_CAST (basesink)->base_time; + accum = basesink->segment.accum; + abs_rate = basesink->segment.abs_rate; - GST_OBJECT_LOCK (basesink); - if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time)) - time = basesink->segment.time; - else - time = 0; - - base = GST_ELEMENT_CAST (basesink)->base_time; - base += basesink->segment.accum; - base = MIN (now, base); - *cur = - gst_guint64_to_gdouble (now - base) * basesink->segment.abs_rate + - time; - - GST_DEBUG_OBJECT (basesink, - "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %" - GST_TIME_FORMAT " + time %" GST_TIME_FORMAT " = %" GST_TIME_FORMAT, - GST_TIME_ARGS (now), GST_TIME_ARGS (base), - GST_TIME_ARGS (basesink->segment.accum), GST_TIME_ARGS (time), - GST_TIME_ARGS (*cur)); - - gst_object_unref (clock); - - res = TRUE; - } + gst_object_ref (clock); + /* need to release the object lock before we can get the time */ GST_OBJECT_UNLOCK (basesink); + + now = gst_clock_get_time (clock); + base += accum; + base = MIN (now, base); + *cur = gst_guint64_to_gdouble (now - base) * abs_rate + time; + + GST_DEBUG_OBJECT (basesink, + "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %" + GST_TIME_FORMAT " + time %" GST_TIME_FORMAT " = %" GST_TIME_FORMAT, + GST_TIME_ARGS (now), GST_TIME_ARGS (base), + GST_TIME_ARGS (accum), GST_TIME_ARGS (time), GST_TIME_ARGS (*cur)); + + gst_object_unref (clock); + + res = TRUE; } default: + /* cannot answer other than TIME */ break; } -beach: return res; + +wrong_state: + { + GST_OBJECT_UNLOCK (basesink); + return FALSE; + } } static gboolean @@ -1482,24 +1512,17 @@ gst_base_sink_query (GstElement * element, GstQuery * query) { gint64 cur = 0; GstFormat format; - gboolean eos; - GST_PAD_PREROLL_LOCK (basesink->sinkpad); - eos = basesink->eos; - GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); + gst_query_parse_position (query, &format, NULL); - if (eos) { - res = gst_base_sink_peer_query (basesink, query); + GST_DEBUG_OBJECT (basesink, "current position format %d", format); + + /* first try to get the position based on the clock */ + if ((res = gst_base_sink_get_position (basesink, format, &cur))) { + gst_query_set_position (query, format, cur); } else { - gst_query_parse_position (query, &format, NULL); - - GST_DEBUG_OBJECT (basesink, "current position format %d", format); - - if ((res = gst_base_sink_get_position (basesink, format, &cur))) { - gst_query_set_position (query, format, cur); - } else { - res = gst_base_sink_peer_query (basesink, query); - } + /* fallback to peer query */ + res = gst_base_sink_peer_query (basesink, query); } break; } @@ -1550,50 +1573,31 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) /* need to complete preroll before this state change completes, there * is no data flow in READY so we can safely assume we need to preroll. */ basesink->offset = 0; - GST_PAD_PREROLL_LOCK (basesink->sinkpad); + GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll"); basesink->have_preroll = FALSE; - GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll to FALSE"); basesink->need_preroll = TRUE; - GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); basesink->have_newsegment = FALSE; ret = GST_STATE_CHANGE_ASYNC; break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING"); GST_PAD_PREROLL_LOCK (basesink->sinkpad); - /* no preroll needed */ - basesink->need_preroll = FALSE; - - /* if we have EOS, we should empty the queue now as there will - * be no more data received in the chain function. - * FIXME, this could block the state change function too long when - * we are pushing and syncing the buffers, better start a new - * thread to do this. */ - if (basesink->eos) { - gboolean do_eos = !basesink->eos_queued; - - gst_base_sink_preroll_queue_empty (basesink, basesink->sinkpad); - - /* need to post EOS message here if it was not in the preroll queue we - * just emptied. */ - if (do_eos) { + if (!gst_base_sink_is_prerolled (basesink)) { + ret = GST_STATE_CHANGE_ASYNC; + basesink->need_preroll = TRUE; + } else { + /* no preroll needed anymore now. */ + basesink->need_preroll = FALSE; + if (basesink->eos) { + /* need to post EOS message here */ GST_DEBUG_OBJECT (basesink, "Now posting EOS"); gst_element_post_message (GST_ELEMENT_CAST (basesink), gst_message_new_eos (GST_OBJECT_CAST (basesink))); } - } else if (!basesink->have_preroll) { - /* queue a commit_state */ - basesink->need_preroll = TRUE; - GST_DEBUG_OBJECT (basesink, - "PAUSED to PLAYING, !eos, !have_preroll, need preroll to TRUE"); - ret = GST_STATE_CHANGE_ASYNC; - /* we know it's not waiting, no need to signal */ - } else { - GST_DEBUG_OBJECT (basesink, - "PAUSED to PLAYING, !eos, have_preroll, need preroll to FALSE"); - /* now let it play */ - GST_PAD_PREROLL_SIGNAL (basesink->sinkpad); } + GST_DEBUG_OBJECT (basesink, "signal preroll"); + GST_PAD_PREROLL_SIGNAL (basesink->sinkpad); GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; default: @@ -1610,36 +1614,24 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - { - GstBaseSinkClass *bclass; - - bclass = GST_BASE_SINK_GET_CLASS (basesink); - GST_PAD_PREROLL_LOCK (basesink->sinkpad); - GST_OBJECT_LOCK (basesink); - /* unlock clock wait if any */ + basesink->need_preroll = TRUE; if (basesink->clock_id) { gst_clock_id_unschedule (basesink->clock_id); } - GST_OBJECT_UNLOCK (basesink); - /* unlock any subclasses */ if (bclass->unlock) bclass->unlock (basesink); - /* if we don't have a preroll buffer and we have not received EOS, - * we need to wait for a preroll */ - GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d", - basesink->have_preroll, basesink->eos); - if (!basesink->have_preroll && !basesink->eos - && GST_STATE_PENDING (basesink) == GST_STATE_PAUSED) { - GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll to TRUE"); - basesink->need_preroll = TRUE; + GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, waiting for lock"); + /* if we don't have a preroll buffer we need to wait for a preroll and + * return ASYNC. */ + if (!gst_base_sink_is_prerolled (basesink)) { + GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll"); ret = GST_STATE_CHANGE_ASYNC; } GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; - } case GST_STATE_CHANGE_PAUSED_TO_READY: break; case GST_STATE_CHANGE_READY_TO_NULL: