From be1f48de3a82ee740eafaf6993e6eb20d5b13be2 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 2 Feb 2006 12:07:48 +0000 Subject: [PATCH] docs/design/part-element-sink.txt: Updated document. Original commit message from CVS: * docs/design/part-element-sink.txt: Updated document. * libs/gst/base/gstbasesink.c: (gst_base_sink_init), (gst_base_sink_finalize), (gst_base_sink_preroll_queue_flush), (gst_base_sink_configure_segment), (gst_base_sink_commit_state), (gst_base_sink_get_sync_times), (gst_base_sink_wait_clock), (gst_base_sink_do_sync), (gst_base_sink_render_object), (gst_base_sink_preroll_object), (gst_base_sink_queue_object_unlocked), (gst_base_sink_queue_object), (gst_base_sink_event), (gst_base_sink_chain_unlocked), (gst_base_sink_chain), (gst_base_sink_loop), (gst_base_sink_activate_pull), (gst_base_sink_get_position), (gst_base_sink_change_state): * libs/gst/base/gstbasesink.h: Totally refactored matching the design doc. Use two segments, one to clip incomming buffers and another to perform sync. Handle queueing correctly, bypass the queue when playing. Make EOS cancelable. Handle errors correctly when operating in pull based mode. * tests/check/elements/fakesink.c: (GST_START_TEST), (fakesink_suite): Added new check for sinks. --- ChangeLog | 28 + docs/design/part-element-sink.txt | 246 +++++-- libs/gst/base/gstbasesink.c | 1113 ++++++++++++++++------------- libs/gst/base/gstbasesink.h | 10 +- tests/check/elements/fakesink.c | 78 +- 5 files changed, 912 insertions(+), 563 deletions(-) diff --git a/ChangeLog b/ChangeLog index 55823eb580..0ddf614928 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,31 @@ +2006-02-02 Wim Taymans + + * docs/design/part-element-sink.txt: + Updated document. + + * libs/gst/base/gstbasesink.c: (gst_base_sink_init), + (gst_base_sink_finalize), (gst_base_sink_preroll_queue_flush), + (gst_base_sink_configure_segment), (gst_base_sink_commit_state), + (gst_base_sink_get_sync_times), (gst_base_sink_wait_clock), + (gst_base_sink_do_sync), (gst_base_sink_render_object), + (gst_base_sink_preroll_object), + (gst_base_sink_queue_object_unlocked), + (gst_base_sink_queue_object), (gst_base_sink_event), + (gst_base_sink_chain_unlocked), (gst_base_sink_chain), + (gst_base_sink_loop), (gst_base_sink_activate_pull), + (gst_base_sink_get_position), (gst_base_sink_change_state): + * libs/gst/base/gstbasesink.h: + Totally refactored matching the design doc. + Use two segments, one to clip incomming buffers and another to + perform sync. + Handle queueing correctly, bypass the queue when playing. + Make EOS cancelable. + Handle errors correctly when operating in pull based mode. + + * tests/check/elements/fakesink.c: (GST_START_TEST), + (fakesink_suite): + Added new check for sinks. + 2006-02-02 Wim Taymans * gst/gstsegment.c: (gst_segment_clip): diff --git a/docs/design/part-element-sink.txt b/docs/design/part-element-sink.txt index 686346354f..cf80eb0043 100644 --- a/docs/design/part-element-sink.txt +++ b/docs/design/part-element-sink.txt @@ -31,96 +31,170 @@ Events other than EOS do not complete the preroll stage. sink overview ------------- - /* commit the state. We return TRUE if we can continue - * streaming, FALSE in the case we go to a READY or NULL state. - * if we go to PLAYING, we don't need to block on preroll. - */ + - TODO: PREROLL_LOCK can be removed and we can safely use the STREAM_LOCK. + + + + # commit the state. We return TRUE if we can continue + # streaming, FALSE in the case we go to a READY or NULL state. + # if we go to PLAYING, we don't need to block on preroll. commit + { LOCK - switch (pending) { + switch (pending) case PLAYING: - need_preroll = FALSE; - break; + need_preroll = FALSE + break case PAUSED: - break; + break case READY: case NULL: - return FALSE; + return FALSE case VOID: - return TRUE; - } - /* update state */ - state = pending; - next = VOID; - pending = VOID; - UNLOCK - return TRUE; + return TRUE - /* handle a prerollable item (EOS or buffer). It is - * always called with the PREROLL_LOCK helt. - * need_preroll indicates that we must perform, commit and - * potentially block on preroll. - */ - handle (time) + # update state + state = pending + next = VOID + pending = VOID + UNLOCK + return TRUE + } + + # sync an object. We have to wait for the element to reach + # the PLAYING state before we can wait on the clock. + # some items do not need synchronisation (most events) so the + # get_times method returns FALSE (not syncable) + # need_preroll indicates that we are not in the PLAYING state + # and therefore need to commit and potentially block on preroll + # if our clock_wait got interrupted we commit and block again. + # The reason for this is that the current item being rendered is + # not yet finished and we can use that item to finish preroll. + do_sync (obj) + { + # get timing information for this object + syncable = get_times (obj, &start, &stop) + if (!syncable) + return OK; again: - while (need_preroll) { - preroll - if (!commit) - return WRONG_STATE - /* commit could have made us not need preroll anymore. */ - if (need_preroll) { - /* release PREROLL_LOCK and wait. prerolled can be observed - * and will be TRUE */ - prerolled = TRUE; + while (need_preroll) + if (need_commit) + need_commit = FALSE + if (!commit) + return WRONG_STATE + + if (need_preroll) + # release PREROLL_LOCK and wait. prerolled can be observed + # and will be TRUE + prerolled = TRUE PREROLL_WAIT (releasing PREROLL_LOCK) - prerolled = FALSE; + prerolled = FALSE if (flushing) return WRONG_STATE - } - } - if (clock && sync) { - /* the only way we can regain the prerolled state is when - * the clock entry gets unscheduled, we then preroll (again) on the - * current item, else we render and preroll on the next buffer. */ + + if (valid (start || stop)) PREROLL_UNLOCK - ret = wait_clock; + end_time = stop + ret = wait_clock (obj,start) PREROLL_LOCK - if (flushing) | /* sinks that sync on buffer contents do like this */ - return WRONG_STATE | while (more_to_render) { - if (ret == UNSCHEDULED) | ret = render - goto again; | if (ret == interrupted) - } | prerolled = TRUE; - render ----->| PREROLL_WAIT (releasing PREROLL_LOCK) - | prerolled = FALSE; + if (flushing) + return WRONG_STATE + # if the clock was unscheduled, we redo the + # preroll + if (ret == UNSCHEDULED) + goto again + } + + # render a prerollable item (EOS or buffer). It is + # always called with the PREROLL_LOCK helt. + render_object (obj) + { + ret = do_sync (obj) + if (ret != OK) + return ret; + + # preroll and syncing done, now we can render + render(obj) + } + | # sinks that sync on buffer contents do like this + | while (more_to_render) + | ret = render + | if (ret == interrupted) + | prerolled = TRUE + render (buffer) ----->| PREROLL_WAIT (releasing PREROLL_LOCK) + | prerolled = FALSE | if (flushing) | return WRONG_STATE - | } - /* various event functions */ + | + + # queue a prerollable item (EOS or buffer). It is + # always called with the PREROLL_LOCK helt. + # This function will commit the state when receiving the + # first prerollable item. + # items are then added to the rendering queue or rendered + # right away if no preroll is needed. + queue (obj, prerollable) + { + if (prerollable) + queuelen++ + + if (need_preroll) + # first item in the queue while we need preroll + # will complete state change and call preroll + if (queuelen == 1) + preroll (obj) + if (need_commit) + need_commit = FALSE + if (!commit) + return WRONG_STATE + + # then see if we need more preroll items before we + # can block + if (need_preroll) + if (queuelen <= maxqueue) + queue.add (obj) + return OK + + # now clear the queue and render each item before + # rendering the current item. + while (queue.hasItem) + render_object (queue.remove()) + + render_object (obj) + queuelen = 0 + } + + # various event functions event EOS: + # events must complete preroll too STREAM_LOCK PREROLL_LOCK if (flushing) return FALSE - ret = handle (end_time); + ret = queue (event, TRUE) if (ret == WRONG_STATE) return FALSE - post_eos - eos = TRUE; PREROLL_UNLOCK STREAM_UNLOCK - break; + break NEWSEGMENT: + # the newsegment must be used to clip incomming + # buffers. Then then go into the queue as non-prerollable + # items used for syncing the buffers STREAM_LOCK PREROLL_LOCK if (flushing) return FALSE set_clip - event + ret = queue (event, FALSE) + if (ret == WRONG_STATE) + return FALSE PREROLL_UNLOCK STREAM_UNLOCK - break; + break FLUSH_START: + # set flushing and unblock all that is waiting event ----> subclasses can interrupt render PREROLL_LOCK flushing = TRUE @@ -130,67 +204,89 @@ sink overview STREAM_LOCK lost_state STREAM_UNLOCK - break; + break FLUSH_END: + # unset flushing and clear all data and eos STREAM_LOCK event PREROLL_LOCK + queue.clear + queuelen = 0 flushing = FALSE - eos = FALSE; + eos = FALSE PREROLL_UNLOCK STREAM_UNLOCK - break; + break + # the chain function checks the buffer falls within the + # configured segment and queues the buffer for preroll and + # rendering chain STREAM_LOCK PREROLL_LOCK if (flushing) return WRONG_STATE if (clip) - handle (time); + queue (buffer, TRUE) PREROLL_UNLOCK STREAM_UNLOCK state switch (transition) READY_PAUSED: - ret = ASYNC; + # no datapassing is going on so we always return ASYNC + ret = ASYNC + need_commit = TRUE eos = FALSE flushing = FALSE - need_preroll = TRUE; - prerolled = FALSE; - break; + need_preroll = TRUE + prerolled = FALSE + break PAUSED_PLAYING: + # we grab the preroll lock. This we can only do if the + # chain function is either doing some clock sync, we are + # waiting for preroll or the chain function is not being called. PREROLL_LOCK if (prerolled || eos) - PREROLL_SIGNAL - ret = OK; - need_preroll = FALSE; + ret = OK + need_commit = FALSE + need_preroll = FALSE if (eos) post_eos + else + PREROLL_SIGNAL else - need_preroll = TRUE; - ret = ASYNC; + need_preroll = TRUE + need_commit = TRUE + ret = ASYNC PREROLL_UNLOCK - break; + break PLAYING_PAUSED: ---> subclass can interrupt render + # we grab the preroll lock. This we can only do if the + # chain function is either doing some clock sync + # or the chain function is not being called. PREROLL_LOCK - need_preroll = TRUE; + need_preroll = TRUE unlock_clock if (prerolled || eos) - ret = OK; + ret = OK else - ret = ASYNC; + ret = ASYNC PREROLL_UNLOCK - break; + break PAUSED_READY: ---> subclass can interrupt render + # we grab the preroll lock. Set to flushing and unlock + # everything. This should exit the chain functions and stop + # streaming. PREROLL_LOCK flushing = TRUE unlock_clock + queue.clear + queuelen = 0 PREROLL_SIGNAL - ret = OK; + ret = OK PREROLL_UNLOCK - break; + break diff --git a/libs/gst/base/gstbasesink.c b/libs/gst/base/gstbasesink.c index dfd14a8ab8..263f228e84 100644 --- a/libs/gst/base/gstbasesink.c +++ b/libs/gst/base/gstbasesink.c @@ -199,10 +199,6 @@ static gboolean gst_base_sink_activate (GstPad * pad); static gboolean gst_base_sink_activate_push (GstPad * pad, gboolean active); static gboolean gst_base_sink_activate_pull (GstPad * pad, gboolean active); static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event); -static inline GstFlowReturn gst_base_sink_handle_buffer (GstBaseSink * basesink, - GstBuffer * buf); -static inline gboolean gst_base_sink_handle_event (GstBaseSink * basesink, - GstEvent * event); static void gst_base_sink_base_init (gpointer g_class) @@ -341,10 +337,11 @@ gst_base_sink_init (GstBaseSink * basesink, gpointer g_class) GST_DEBUG_FUNCPTR (gst_base_sink_event)); gst_pad_set_chain_function (basesink->sinkpad, GST_DEBUG_FUNCPTR (gst_base_sink_chain)); - gst_element_add_pad (GST_ELEMENT (basesink), basesink->sinkpad); + gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad); basesink->pad_mode = GST_ACTIVATE_NONE; basesink->preroll_queue = g_queue_new (); + basesink->abidata.ABI.clip_segment = gst_segment_new (); basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH; basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL; @@ -362,6 +359,7 @@ gst_base_sink_finalize (GObject * object) basesink = GST_BASE_SINK (object); g_queue_free (basesink->preroll_queue); + gst_segment_free (basesink->abidata.ABI.clip_segment); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -433,50 +431,16 @@ gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size, return GST_FLOW_OK; } -/* with PREROLL_LOCK, STREAM_LOCK */ -static GstFlowReturn -gst_base_sink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad) -{ - GstMiniObject *obj; - GQueue *q = basesink->preroll_queue; - GstFlowReturn ret; - - ret = GST_FLOW_OK; - - if (q) { - GST_DEBUG_OBJECT (basesink, "emptying queue"); - while ((obj = g_queue_pop_head (q))) { - basesink->preroll_queued--; - - if (G_LIKELY (GST_IS_BUFFER (obj))) { - basesink->buffers_queued--; - GST_DEBUG_OBJECT (basesink, "popped buffer %p", obj); - ret = gst_base_sink_handle_buffer (basesink, GST_BUFFER_CAST (obj)); - } else { - basesink->events_queued--; - GST_DEBUG_OBJECT (basesink, "popped event %p", obj); - gst_base_sink_handle_event (basesink, GST_EVENT_CAST (obj)); - ret = GST_FLOW_OK; - } - } - GST_DEBUG_OBJECT (basesink, "queue empty"); - } - return ret; -} - /* with PREROLL_LOCK, STREAM_LOCK */ static void gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad) { GstMiniObject *obj; - GQueue *q = basesink->preroll_queue; GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink); - if (q) { - while ((obj = g_queue_pop_head (q))) { - GST_DEBUG_OBJECT (basesink, "popped %p", obj); - gst_mini_object_unref (obj); - } + while ((obj = g_queue_pop_head (basesink->preroll_queue))) { + GST_DEBUG_OBJECT (basesink, "popped %p", obj); + gst_mini_object_unref (obj); } /* we can't have EOS anymore now */ basesink->eos = FALSE; @@ -489,6 +453,36 @@ gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad) GST_PAD_PREROLL_SIGNAL (pad); } +/* with STREAM_LOCK, configures given segment with the event information. */ +static void +gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad, + GstEvent * event, GstSegment * segment) +{ + gboolean update; + gdouble rate; + GstFormat format; + gint64 start; + gint64 stop; + gint64 time; + + /* the newsegment event is needed to bring the buffer timestamps to the + * stream time and to drop samples outside of the playback segment. */ + gst_event_parse_new_segment (event, &update, &rate, &format, + &start, &stop, &time); + + GST_OBJECT_LOCK (basesink); + gst_segment_set_newsegment (segment, update, rate, format, start, stop, time); + + GST_DEBUG_OBJECT (basesink, + "configured NEWSEGMENT %" GST_TIME_FORMAT " -- %" + GST_TIME_FORMAT ", time %" GST_TIME_FORMAT ", accum %" + GST_TIME_FORMAT, + GST_TIME_ARGS (segment->start), + GST_TIME_ARGS (segment->stop), + GST_TIME_ARGS (segment->time), GST_TIME_ARGS (segment->accum)); + GST_OBJECT_UNLOCK (basesink); +} + /* with PREROLL_LOCK, STREAM_LOCK */ static gboolean gst_base_sink_commit_state (GstBaseSink * basesink) @@ -507,6 +501,7 @@ gst_base_sink_commit_state (GstBaseSink * basesink) switch (pending) { case GST_STATE_PLAYING: + GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING"); basesink->need_preroll = FALSE; post_playing = TRUE; /* post PAUSED too when we were READY */ @@ -515,7 +510,7 @@ gst_base_sink_commit_state (GstBaseSink * basesink) } break; case GST_STATE_PAUSED: - basesink->need_preroll = TRUE; + GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED"); post_paused = TRUE; post_pending = GST_STATE_VOID_PENDING; break; @@ -556,200 +551,513 @@ gst_base_sink_commit_state (GstBaseSink * basesink) nothing_pending: { + GST_DEBUG_OBJECT (basesink, "nothing to commit"); GST_OBJECT_UNLOCK (basesink); return TRUE; } stopping: { /* app is going to READY */ + GST_DEBUG_OBJECT (basesink, "stopping"); basesink->need_preroll = FALSE; + basesink->flushing = TRUE; GST_OBJECT_UNLOCK (basesink); return FALSE; } } -/* with STREAM_LOCK */ -static GstFlowReturn -gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad, - GstMiniObject * obj) + +/* with STREAM_LOCK, PREROLL_LOCK + * + * Returns TRUE if the object needs synchronisation and takes therefore + * part in prerolling. + * + * start and stop cannot be NULL. + */ +static gboolean +gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj, + GstClockTime * start, GstClockTime * stop) { - gint length; - gboolean have_buffer; - GstFlowReturn ret; + GstClockTime sstart, sstop; + gint64 cstart, cstop; + GstBaseSinkClass *bclass; + GstBuffer *buffer; - GST_PAD_PREROLL_LOCK (pad); - if (basesink->flushing) - goto in_flushing; - - /* push object on the queue */ - GST_DEBUG_OBJECT (basesink, "push %p on preroll_queue", obj); - g_queue_push_tail (basesink->preroll_queue, obj); - - have_buffer = GST_IS_BUFFER (obj); - if (G_LIKELY (have_buffer)) { - GstBuffer *buf = GST_BUFFER_CAST (obj); - GstClockTime start = -1, end = -1; - - if (!basesink->have_newsegment) { - GST_ELEMENT_WARNING (basesink, STREAM, FAILED, - (_("Internal data flow problem.")), - ("Received buffer without a new-segment. Cannot sync to clock.")); - basesink->have_newsegment = TRUE; - /* this means this sink will not be able to sync to the clock */ - basesink->segment.start = -1; - basesink->segment.stop = -1; - } - - /* check if the buffer needs to be dropped */ - /* we don't use the subclassed method as it may not return - * valid values for our purpose here */ - gst_base_sink_get_times (basesink, buf, &start, &end); - - GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT - ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end)); - - if (GST_CLOCK_TIME_IS_VALID (start) && - (basesink->segment.format == GST_FORMAT_TIME)) { - if (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME, - (gint64) start, (gint64) end, NULL, NULL)) - goto out_of_segment; - } - basesink->buffers_queued++; - basesink->preroll_queued++; - } else { + if (G_UNLIKELY (GST_IS_EVENT (obj))) { GstEvent *event = GST_EVENT_CAST (obj); switch (GST_EVENT_TYPE (event)) { - /* only EOS finishes preroll */ + /* EOS event needs syncing */ case GST_EVENT_EOS: - basesink->preroll_queued++; - basesink->eos_queued = TRUE; - break; + *start = basesink->end_time; + *stop = -1; + return TRUE; + /* other events do not need syncing */ + /* FIXME, maybe NEWSEGMENT might need synchronisation + * since the POSITION query depends on accumulated times + */ default: - break; - } - basesink->events_queued++; - } - - GST_DEBUG_OBJECT (basesink, - "now %d preroll, %d buffers, %d events on queue", - basesink->preroll_queued, - basesink->buffers_queued, basesink->events_queued); - - /* check if we are prerolling */ - if (G_LIKELY (!basesink->need_preroll)) - goto no_preroll; - - length = basesink->preroll_queued; - GST_DEBUG_OBJECT (basesink, "prerolled length %d", length); - - if (length == 1) { - GST_DEBUG_OBJECT (basesink, "do preroll %p", obj); - - /* if it's a buffer, we need to call the preroll method */ - if (G_LIKELY (have_buffer)) { - GstBaseSinkClass *bclass; - GstBuffer *buf = GST_BUFFER_CAST (obj); - - GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT, - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); - - bclass = GST_BASE_SINK_GET_CLASS (basesink); - if (bclass->preroll) - if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK) - goto preroll_failed; - } - - /* commit state */ - if (!gst_base_sink_commit_state (basesink)) - goto stopping; - - /* it is possible that commiting the state made us go to PLAYING - * now in which case we don't need to block anymore. */ - if (!basesink->need_preroll) { - goto no_preroll; + return FALSE; } } - /* see if we need to block now. */ - if (G_UNLIKELY (length <= basesink->preroll_queue_max_len)) - goto more_preroll; + /* else do buffer sync code */ + buffer = GST_BUFFER_CAST (obj); - /* block until the state changes, or we get a flush, or something */ - GST_DEBUG_OBJECT (basesink, "waiting to finish preroll"); - basesink->have_preroll = TRUE; - GST_PAD_PREROLL_WAIT (pad); - basesink->have_preroll = FALSE; - GST_DEBUG_OBJECT (basesink, "done preroll"); - if (G_UNLIKELY (basesink->flushing)) - goto flushing; + bclass = GST_BASE_SINK_GET_CLASS (basesink); - /* we can start rendering the data now */ -no_preroll: - GST_DEBUG_OBJECT (basesink, "no preroll needed"); - /* render all our buffers now */ - ret = gst_base_sink_preroll_queue_empty (basesink, pad); + sstart = sstop = -1; + if (bclass->get_times) + bclass->get_times (basesink, buffer, &sstart, &sstop); - GST_PAD_PREROLL_UNLOCK (pad); + GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT + ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (sstart), + GST_TIME_ARGS (sstop)); - return ret; + if (G_LIKELY (basesink->segment.format == GST_FORMAT_TIME)) { + /* clip */ + if (G_UNLIKELY (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME, + (gint64) sstart, (gint64) sstop, &cstart, &cstop))) + goto out_of_segment; + + if (G_UNLIKELY (sstart != cstart || sstop != cstop)) { + GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT + ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart), + GST_TIME_ARGS (cstop)); + } + + /* save last valid times seen. */ + if (GST_CLOCK_TIME_IS_VALID (cstop)) + gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, + (gint64) cstop); + else + gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, + (gint64) cstart); + } else { + if (basesink->segment.accum == 0) { + /* no clipping for formats different from GST_FORMAT_TIME */ + cstart = sstart; + cstop = sstop; + } else { + cstart = -1; + cstop = -1; + } + } + + *start = + gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cstart); + *stop = + gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cstop); + + /* buffers always need syncing and preroll */ + return TRUE; /* special cases */ out_of_segment: { - GstBuffer *buf; - - GST_DEBUG_OBJECT (basesink, "dropping buffer, out of segment"); - /* take the buffer off the queue again */ - buf = GST_BUFFER (g_queue_pop_tail (basesink->preroll_queue)); - - gst_buffer_unref (buf); - GST_PAD_PREROLL_UNLOCK (pad); - - return GST_FLOW_OK; + /* should not happen, we return FALSE so that we don't try to + * sync on it. */ + GST_ELEMENT_WARNING (basesink, STREAM, FAILED, + (NULL), ("unexpected buffer out of segment found.")); + GST_LOG_OBJECT (basesink, "buffer skipped, not in segment"); + return FALSE; } -in_flushing: - { - gst_mini_object_unref (obj); - GST_PAD_PREROLL_UNLOCK (pad); - GST_DEBUG_OBJECT (basesink, "sink is flushing"); +} - return GST_FLOW_WRONG_STATE; +/* with STREAM_LOCK, PREROLL_LOCK + * + * Waits for the clock to reach @time. If @time is not valid, no + * synchronisation is done. + * If synchronisation is disabled in the element or there is no + * clock, no synchronisation is done. + * Else a blocking wait is performed on the clock. We save the ClockID + * so we can unlock the entry at any time. While we are blocking, we + * release the PREROLL_LOCK so that other threads can interrupt the entry. + */ +static GstClockReturn +gst_base_sink_wait_clock (GstBaseSink * basesink, GstClockTime time) +{ + GstClockID id; + GstClockReturn ret; + GstClock *clock; + GstClockTime base_time; + + if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) + goto invalid_time; + + GST_OBJECT_LOCK (basesink); + if (G_UNLIKELY (!basesink->sync)) + goto no_sync; + + if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL)) + goto no_clock; + + base_time = GST_ELEMENT_CAST (basesink)->base_time; + id = gst_clock_new_single_shot_id (clock, base_time + time); + GST_OBJECT_UNLOCK (basesink); + + basesink->clock_id = id; + /* release the preroll lock while waiting */ + GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); + + ret = gst_clock_id_wait (id, NULL); + + GST_PAD_PREROLL_LOCK (basesink->sinkpad); + gst_clock_id_unref (id); + basesink->clock_id = NULL; + + return ret; + +invalid_time: + { + GST_DEBUG_OBJECT (basesink, "time not valid, no sync needed"); + return GST_CLOCK_OK; + } +no_sync: + { + GST_DEBUG_OBJECT (basesink, "sync disabled"); + GST_OBJECT_UNLOCK (basesink); + return GST_CLOCK_OK; + } +no_clock: + { + GST_DEBUG_OBJECT (basesink, "no clock, can't sync"); + GST_OBJECT_UNLOCK (basesink); + return GST_CLOCK_OK; + } +} + +/* with STREAM_LOCK, PREROLL_LOCK + * + * Make sure we are in PLAYING and synchronize an object to the clock. + * + * If we need preroll, we are not in PLAYING. We try to commit the state + * if needed and then block if we still are not PLAYING. + * + * We start waiting on the clock in PLAYING. If we got interrupted, we + * immediatly try to repreroll. + * + * Some objects do not need synchronisation (most events) and so this function + * immediatly returns GST_FLOW_OK. + * + * does not take ownership of obj. + */ +static GstFlowReturn +gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad, + GstMiniObject * obj) +{ + GstClockTime start, stop; + gboolean syncable; + GstClockReturn status = GST_CLOCK_OK; + + /* get timing information for this object */ + start = stop = -1; + syncable = gst_base_sink_get_sync_times (basesink, obj, &start, &stop); + + /* a syncable object needs to participate in preroll and + * clocking. All buffers and EOS are syncable. */ + if (G_UNLIKELY (!syncable)) + goto not_syncable; + +again: + /* first do preroll, this makes sure we commit our state + * to PAUSED and can continue to PLAYING. We cannot perform + * any clock sync in PAUSED because there is no clock. + */ + while (G_UNLIKELY (basesink->need_preroll)) { + if (G_LIKELY (basesink->playing_async)) { + basesink->playing_async = FALSE; + /* commit state */ + if (G_UNLIKELY (!gst_base_sink_commit_state (basesink))) + goto stopping; + } + + /* need to recheck here because the commit state could have + * made us not need the preroll anymore */ + if (G_LIKELY (basesink->need_preroll)) { + /* block until the state changes, or we get a flush, or something */ + GST_DEBUG_OBJECT (basesink, "waiting to finish preroll"); + basesink->have_preroll = TRUE; + GST_PAD_PREROLL_WAIT (pad); + basesink->have_preroll = FALSE; + GST_DEBUG_OBJECT (basesink, "done preroll"); + if (G_UNLIKELY (basesink->flushing)) + goto flushing; + } + } + + /* preroll done, we can sync since we ar in PLAYING now. */ + GST_DEBUG_OBJECT (basesink, "waiting for clock"); + basesink->end_time = stop; + status = gst_base_sink_wait_clock (basesink, start); + GST_DEBUG_OBJECT (basesink, "clock returned %d", status); + + /* waiting could be interrupted and we can be flushing now */ + if (G_UNLIKELY (basesink->flushing)) + goto flushing; + + /* check for unlocked by a state change, we are not flushing so + * we can try to preroll on the current buffer. */ + if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) { + GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more"); + goto again; + } + + /* FIXME, update clock stats here and do some QoS */ + + return GST_FLOW_OK; + + /* ERRORS */ +not_syncable: + { + GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj); + return GST_FLOW_OK; } flushing: { - gst_base_sink_preroll_queue_flush (basesink, pad); - GST_PAD_PREROLL_UNLOCK (pad); - GST_DEBUG_OBJECT (basesink, "sink is flushing"); - + GST_DEBUG_OBJECT (basesink, "we are flushing"); return GST_FLOW_WRONG_STATE; } stopping: { - GST_PAD_PREROLL_UNLOCK (pad); - GST_DEBUG_OBJECT (basesink, "stopping"); - + GST_DEBUG_OBJECT (basesink, "stopping while commiting state"); return GST_FLOW_WRONG_STATE; } +} + +/* with STREAM_LOCK, PREROLL_LOCK, + * + * Synchronize the object on the clock and then render it. + * + * takes ownership of obj. + */ +static GstFlowReturn +gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad, + GstMiniObject * obj) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstBaseSinkClass *bclass; + + /* synchronize this object */ + ret = gst_base_sink_do_sync (basesink, pad, obj); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto sync_failed; + + /* and now render */ + if (G_LIKELY (GST_IS_BUFFER (obj))) { + bclass = GST_BASE_SINK_GET_CLASS (basesink); + + GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj); + if (G_LIKELY (bclass->render)) + ret = bclass->render (basesink, GST_BUFFER_CAST (obj)); + } else { + GstEvent *event = GST_EVENT_CAST (obj); + gboolean ok = TRUE; + + bclass = GST_BASE_SINK_GET_CLASS (basesink); + + GST_DEBUG_OBJECT (basesink, "rendering event %p", obj); + if (bclass->event) + ok = bclass->event (basesink, event); + + if (G_LIKELY (ok)) { + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + /* the EOS event is completely handled so we mark + * ourselves as being in the EOS state. eos is also + * protected by the object lock so we can read it when + * answering the POSITION query. */ + GST_OBJECT_LOCK (basesink); + basesink->eos = TRUE; + GST_OBJECT_UNLOCK (basesink); + /* ok, now we can post the message */ + GST_DEBUG_OBJECT (basesink, "Now posting EOS"); + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_eos (GST_OBJECT_CAST (basesink))); + break; + case GST_EVENT_NEWSEGMENT: + /* configure the segment */ + gst_base_sink_configure_segment (basesink, pad, event, + &basesink->segment); + default: + break; + } + } + } + + GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj); + gst_mini_object_unref (obj); + + return ret; + + /* ERRORS */ +sync_failed: + { + GST_DEBUG_OBJECT (basesink, "do_sync returned %s, unref object %p", + gst_flow_get_name (ret), obj); + gst_mini_object_unref (obj); + + return ret; + } +} + +/* with STREAM_LOCK, PREROLL_LOCK + * + * Perform preroll on the given object. For buffers this means + * calling the preroll subclass method. + * If that succeeds, the state will be commited. + * + * function does not take ownership of obj. + */ +static GstFlowReturn +gst_base_sink_preroll_object (GstBaseSink * basesink, GstPad * pad, + GstMiniObject * obj) +{ + GstFlowReturn ret; + + GST_DEBUG_OBJECT (basesink, "do preroll %p", obj); + + /* if it's a buffer, we need to call the preroll method */ + if (G_LIKELY (GST_IS_BUFFER (obj))) { + GstBaseSinkClass *bclass; + GstBuffer *buf = GST_BUFFER_CAST (obj); + + GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); + + bclass = GST_BASE_SINK_GET_CLASS (basesink); + if (bclass->preroll) + if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK) + goto preroll_failed; + } + + /* commit state */ + if (G_LIKELY (basesink->playing_async)) { + basesink->playing_async = FALSE; + if (G_UNLIKELY (!gst_base_sink_commit_state (basesink))) + goto stopping; + } + + return GST_FLOW_OK; + + /* ERRORS */ preroll_failed: { - GST_DEBUG_OBJECT (basesink, "preroll failed"); - gst_base_sink_preroll_queue_flush (basesink, pad); - GST_PAD_PREROLL_UNLOCK (pad); + GST_DEBUG_OBJECT (basesink, "preroll failed, abort state"); + gst_element_abort_state (GST_ELEMENT_CAST (basesink)); + return ret; + } +stopping: + { + GST_DEBUG_OBJECT (basesink, "stopping while commiting state"); + return GST_FLOW_WRONG_STATE; + } +} - GST_DEBUG_OBJECT (basesink, "abort state"); - gst_element_abort_state (GST_ELEMENT (basesink)); +/* with STREAM_LOCK, PREROLL_LOCK + * + * Queue an object for rendering. + * The first prerollable object queued will complete the preroll. If the + * preroll queue if filled, we render all the objects in the queue. + * + * This function takes ownership of the object. + */ +static GstFlowReturn +gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad, + GstMiniObject * obj, gboolean prerollable) +{ + GstFlowReturn ret = GST_FLOW_OK; + gint length; + if (G_LIKELY (prerollable)) + basesink->preroll_queued++; + + GST_DEBUG_OBJECT (basesink, "now %d prerolled items", + basesink->preroll_queued); + + if (G_UNLIKELY (basesink->need_preroll)) { + length = basesink->preroll_queued; + + if (length == 1) { + ret = gst_base_sink_preroll_object (basesink, pad, obj); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto preroll_failed; + } + /* need to recheck, commmit state during preroll could have made us + * not need more preroll. */ + if (G_UNLIKELY (basesink->need_preroll)) { + GQueue *q; + + /* see if we can render now. */ + if (G_UNLIKELY (length <= basesink->preroll_queue_max_len)) + goto more_preroll; + + /* we can start rendering (or blocking) */ + GST_DEBUG_OBJECT (basesink, "emptying queue"); + q = basesink->preroll_queue; + while (G_UNLIKELY (!g_queue_is_empty (q))) { + /* FIXME, do something with the return value? */ + ret = gst_base_sink_render_object (basesink, pad, g_queue_pop_head (q)); + } + } + } + + /* now render the object */ + ret = gst_base_sink_render_object (basesink, pad, obj); + basesink->preroll_queued = 0; + + return ret; + + /* special cases */ +preroll_failed: + { + GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s", + gst_flow_get_name (ret)); + gst_mini_object_unref (obj); return ret; } more_preroll: { - GST_DEBUG_OBJECT (basesink, "need more preroll data"); - GST_PAD_PREROLL_UNLOCK (pad); + /* add object to the queue and return */ + GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d", + length, basesink->preroll_queue_max_len); + g_queue_push_tail (basesink->preroll_queue, obj); return GST_FLOW_OK; } } +/* with STREAM_LOCK + * + * This function grabs the PREROLL_LOCK and adds the object to + * the queue. + * + * This function takes ownership of obj. + */ +static GstFlowReturn +gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad, + GstMiniObject * obj, gboolean prerollable) +{ + GstFlowReturn ret; + + GST_PAD_PREROLL_LOCK (pad); + if (G_UNLIKELY (basesink->flushing)) + goto flushing; + + ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable); + GST_PAD_PREROLL_UNLOCK (pad); + + return ret; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (basesink, "sink is flushing"); + GST_PAD_PREROLL_UNLOCK (pad); + gst_mini_object_unref (obj); + return GST_FLOW_WRONG_STATE; + } +} + static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event) { @@ -768,46 +1076,32 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) { GstFlowReturn ret; - /* EOS also finishes the preroll */ + /* EOS is a prerollable object */ ret = - gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT (event)); + gst_base_sink_queue_object (basesink, pad, + GST_MINI_OBJECT_CAST (event), TRUE); + + if (G_UNLIKELY (ret != GST_FLOW_OK)) + result = FALSE; break; } case GST_EVENT_NEWSEGMENT: { - /* the new segment is a non prerollable item and does not block anything, - * we can just configure the current segment and return. */ - /* FIXME, if the preroll-queue-len > 1 we need to queue the newsegment - * and activate them before attempting the synchronize */ - gboolean update; - gdouble rate; - GstFormat format; - gint64 start; - gint64 stop; - gint64 time; - - /* the newsegment event is needed to bring the buffer timestamps to the - * stream time and to drop samples outside of the playback segment. */ - gst_event_parse_new_segment (event, &update, &rate, &format, - &start, &stop, &time); + GstFlowReturn ret; basesink->have_newsegment = TRUE; - GST_OBJECT_LOCK (basesink); - gst_segment_set_newsegment (&basesink->segment, update, rate, format, - start, stop, time); + /* the new segment is a non prerollable item and does not block anything, + * we need to configure the current clipping segment and insert the event + * in the queue to serialize it with the buffers for rendering. */ + gst_base_sink_configure_segment (basesink, pad, event, + basesink->abidata.ABI.clip_segment); - GST_DEBUG_OBJECT (basesink, - "received NEWSEGMENT %" GST_TIME_FORMAT " -- %" - GST_TIME_FORMAT ", time %" GST_TIME_FORMAT ", accum %" - GST_TIME_FORMAT, - GST_TIME_ARGS (basesink->segment.start), - GST_TIME_ARGS (basesink->segment.stop), - GST_TIME_ARGS (basesink->segment.time), - GST_TIME_ARGS (basesink->segment.accum)); - GST_OBJECT_UNLOCK (basesink); - - gst_event_unref (event); + ret = + gst_base_sink_queue_object (basesink, pad, + GST_MINI_OBJECT_CAST (event), FALSE); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + result = FALSE; break; } case GST_EVENT_FLUSH_START: @@ -824,7 +1118,8 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) GST_PAD_STREAM_LOCK (pad); /* and we need to commit our state again on the next * prerolled buffer */ - gst_element_lost_state (GST_ELEMENT (basesink)); + basesink->playing_async = TRUE; + gst_element_lost_state (GST_ELEMENT_CAST (basesink)); GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event); GST_PAD_STREAM_UNLOCK (pad); @@ -839,6 +1134,8 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) /* we need new segment info after the flush. */ gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (basesink->abidata.ABI.clip_segment, + GST_FORMAT_UNDEFINED); basesink->have_newsegment = FALSE; GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event); @@ -848,8 +1145,8 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) /* other events are sent to queue or subclass depending on if they * are serialized. */ if (GST_EVENT_IS_SERIALIZED (event)) { - gst_base_sink_handle_object (basesink, pad, - GST_MINI_OBJECT_CAST (event)); + gst_base_sink_queue_object (basesink, pad, + GST_MINI_OBJECT_CAST (event), FALSE); } else { if (bclass->event) bclass->event (basesink, event); @@ -895,273 +1192,76 @@ gst_base_sink_is_prerolled (GstBaseSink * basesink) return res; } -/* with STREAM_LOCK, PREROLL_LOCK, id should be a valid GstClockID */ -static GstClockReturn -gst_base_sink_wait (GstBaseSink * basesink, GstClockID id) -{ - GstClockReturn ret; - - basesink->clock_id = id; - /* release the preroll lock while waiting */ - GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); - - ret = gst_clock_id_wait (id, NULL); - - GST_PAD_PREROLL_LOCK (basesink->sinkpad); - gst_clock_id_unref (id); - basesink->clock_id = NULL; - - if (basesink->flushing) - ret = GST_CLOCK_UNSCHEDULED; - - return ret; -} - -/* perform synchronisation on a buffer +/* with STREAM_LOCK, PREROLL_LOCK * - * 1) check if we have a clock, if not, do nothing - * 2) calculate the start and end time of the buffer - * 3) create a single shot notification to wait on - * the clock, save the entry so we can unlock it - * 4) wait on the clock, this blocks - * 5) unref the clockid again + * Takes a buffer and compare the timestamps with the last segment. + * If the buffer falls outside of the segment boundaries, drop it. + * Else queue the buffer for preroll and rendering. * - * Is called with STREAM_LOCK + * This function takes ownership of the buffer. */ -static GstClockReturn -gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer) +static GstFlowReturn +gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad, + GstBuffer * buf) { - GstClockReturn result; - GstClockTime start, end; - gint64 cstart, cend; - GstBaseSinkClass *bclass; - GstClockTime base_time; - GstClock *clock; - GstClockID id; - GstClockTimeDiff stream_start, stream_end; + GstFlowReturn result; + GstClockTime start = -1, end = -1; - bclass = GST_BASE_SINK_GET_CLASS (basesink); + if (G_UNLIKELY (basesink->flushing)) + goto flushing; - start = end = -1; - if (bclass->get_times) - bclass->get_times (basesink, buffer, &start, &end); + if (G_UNLIKELY (!basesink->have_newsegment)) { + GST_ELEMENT_WARNING (basesink, STREAM, FAILED, + (_("Internal data flow problem.")), + ("Received buffer without a new-segment. Cannot sync to clock.")); + basesink->have_newsegment = TRUE; + /* this means this sink will not be able to sync to the clock */ + basesink->abidata.ABI.clip_segment->start = -1; + basesink->abidata.ABI.clip_segment->stop = -1; + basesink->segment.start = -1; + basesink->segment.stop = -1; + } + + /* check if the buffer needs to be dropped */ + /* we don't use the subclassed method as it may not return + * valid values for our purpose here */ + gst_base_sink_get_times (basesink, buf, &start, &end); GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end)); - /* if we don't have a timestamp, we don't sync */ - if (!GST_CLOCK_TIME_IS_VALID (start)) - goto invalid_start; - - if (basesink->segment.format == GST_FORMAT_TIME) { - /* clip */ - if (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME, - (gint64) start, (gint64) end, &cstart, &cend)) + if (GST_CLOCK_TIME_IS_VALID (start) && + (basesink->abidata.ABI.clip_segment->format == GST_FORMAT_TIME)) { + if (G_UNLIKELY (!gst_segment_clip (basesink->abidata.ABI.clip_segment, + GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL))) goto out_of_segment; - - if (start != cstart || end != cend) { - GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT - ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart), - GST_TIME_ARGS (cend)); - } - - /* save last valid times seen. */ - if (GST_CLOCK_TIME_IS_VALID (cend)) - gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, - (gint64) cend); - else - gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, - (gint64) cstart); - } else { - /* no clipping for formats different from GST_FORMAT_TIME */ - cstart = start; - cend = end; } - if (!((basesink->segment.format == GST_FORMAT_TIME) - || (basesink->segment.accum == 0))) - goto no_segment; - - GST_OBJECT_LOCK (basesink); - if (!basesink->sync) - goto no_sync; - - if ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL) - goto no_clock; - - /* now do clocking, LOCK is helt */ - stream_start = - gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cstart); - - base_time = GST_ELEMENT_CAST (basesink)->base_time; - id = gst_clock_new_single_shot_id (clock, stream_start + base_time); - GST_OBJECT_UNLOCK (basesink); - - GST_LOG_OBJECT (basesink, - "waiting for clock, base time %" GST_TIME_FORMAT - " stream_start %" GST_TIME_FORMAT, - GST_TIME_ARGS (base_time), GST_TIME_ARGS (stream_start)); - - /* also save end_time of this buffer so that we can wait - * to signal EOS */ - stream_end = - gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cend); - - if (GST_CLOCK_TIME_IS_VALID (stream_end)) - basesink->end_time = stream_end + base_time; - else - basesink->end_time = GST_CLOCK_TIME_NONE; - - result = gst_base_sink_wait (basesink, id); - - GST_LOG_OBJECT (basesink, "clock entry done: %d", result); + /* now we can process the buffer in the queue, this function takes ownership + * of the buffer */ + result = + gst_base_sink_queue_object_unlocked (basesink, pad, + GST_MINI_OBJECT_CAST (buf), TRUE); return result; - /* special cases */ -invalid_start: + /* ERRORS */ +flushing: { - GST_DEBUG_OBJECT (basesink, "start not valid, cannot sync"); - return GST_CLOCK_OK; + GST_DEBUG_OBJECT (basesink, "sink is flushing"); + gst_buffer_unref (buf); + return GST_FLOW_WRONG_STATE; } out_of_segment: { - GST_LOG_OBJECT (basesink, "buffer skipped, not in segment"); - /* we return BADTIME to make callers drop this buffer. */ - return GST_CLOCK_BADTIME; - } -no_sync: - { - GST_DEBUG_OBJECT (basesink, "no need to sync"); - GST_OBJECT_UNLOCK (basesink); - return GST_CLOCK_OK; - } -no_clock: - { - GST_DEBUG_OBJECT (basesink, "no clock, can't sync"); - GST_OBJECT_UNLOCK (basesink); - return GST_CLOCK_OK; - } -no_segment: - { - GST_DEBUG_OBJECT (basesink, "no segment info, can't sync"); - return GST_CLOCK_OK; + GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment"); + gst_buffer_unref (buf); + return GST_FLOW_OK; } } - -/* handle an event - * - * 2) render the event - * 3) unref the event - * - * called with STREAM_LOCK, PREROLL_LOCK +/* with STREAM_LOCK */ -static inline gboolean -gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event) -{ - GstBaseSinkClass *bclass; - gboolean ret; - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_EOS: - { - GstClock *clock; - gboolean unlock = TRUE; - - GST_OBJECT_LOCK (basesink); - basesink->eos = TRUE; - basesink->eos_queued = FALSE; - if ((clock = GST_ELEMENT_CLOCK (basesink)) != NULL) { - /* wait for last buffer to finish if we have a valid end time */ - if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) { - GstClockID id; - - id = gst_clock_new_single_shot_id (clock, basesink->end_time); - unlock = FALSE; - GST_OBJECT_UNLOCK (basesink); - - gst_base_sink_wait (basesink, id); - basesink->end_time = GST_CLOCK_TIME_NONE; - } - } - if (unlock) - GST_OBJECT_UNLOCK (basesink); - break; - } - default: - break; - } - - bclass = GST_BASE_SINK_GET_CLASS (basesink); - if (bclass->event) - ret = bclass->event (basesink, event); - else - ret = TRUE; - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_EOS: - /* if we are still EOS, we can post the EOS message */ - if (basesink->eos) { - /* ok, now we can post the message */ - GST_DEBUG_OBJECT (basesink, "Now posting EOS"); - gst_element_post_message (GST_ELEMENT_CAST (basesink), - gst_message_new_eos (GST_OBJECT_CAST (basesink))); - } - break; - default: - break; - } - - GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event); - gst_event_unref (event); - - return ret; -} - -/* handle a buffer - * - * 1) first sync on the buffer - * 2) render the buffer - * 3) unref the buffer - * - * called with STREAM_LOCK, PREROLL_LOCK - */ -static inline GstFlowReturn -gst_base_sink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf) -{ - GstFlowReturn ret = GST_FLOW_OK; - GstClockReturn status; - - status = gst_base_sink_do_sync (basesink, buf); - switch (status) { - case GST_CLOCK_EARLY: - GST_DEBUG_OBJECT (basesink, "buffer too late!, rendering anyway"); - /* fallthrough for now */ - case GST_CLOCK_OK: - { - GstBaseSinkClass *bclass; - - bclass = GST_BASE_SINK_GET_CLASS (basesink); - if (bclass->render) - ret = bclass->render (basesink, buf); - break; - } - case GST_CLOCK_BADTIME: - /* when out of segment, should normally not happen */ - case GST_CLOCK_UNSCHEDULED: - /* unlocked by a state change */ - default: - GST_DEBUG_OBJECT (basesink, "clock returned %d, not rendering", status); - break; - } - - GST_DEBUG_OBJECT (basesink, "buffer unref after render %p", basesink, buf); - gst_buffer_unref (buf); - - return ret; -} - static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buf) { @@ -1170,11 +1270,12 @@ gst_base_sink_chain (GstPad * pad, GstBuffer * buf) basesink = GST_BASE_SINK (gst_pad_get_parent (pad)); - if (!(basesink->pad_mode == GST_ACTIVATE_PUSH)) + if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) goto wrong_mode; - result = - gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT_CAST (buf)); + GST_PAD_PREROLL_LOCK (pad); + result = gst_base_sink_chain_unlocked (basesink, pad, buf); + GST_PAD_PREROLL_UNLOCK (pad); done: gst_object_unref (basesink); @@ -1185,9 +1286,11 @@ done: wrong_mode: { GST_OBJECT_LOCK (pad); - g_warning ("Push on pad %s:%s, but it was not activated in push mode", + GST_WARNING_OBJECT (basesink, + "Push on pad %s:%s, but it was not activated in push mode", GST_DEBUG_PAD_NAME (pad)); GST_OBJECT_UNLOCK (pad); + gst_buffer_unref (buf); /* we don't post an error message this will signal to the peer * pushing that EOS is reached. */ result = GST_FLOW_UNEXPECTED; @@ -1195,6 +1298,8 @@ wrong_mode: } } +/* with STREAM_LOCK + */ static void gst_base_sink_loop (GstPad * pad) { @@ -1207,25 +1312,47 @@ gst_base_sink_loop (GstPad * pad) g_assert (basesink->pad_mode == GST_ACTIVATE_PULL); result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf); - if (result != GST_FLOW_OK) + if (G_UNLIKELY (result != GST_FLOW_OK)) goto paused; - result = gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT (buf)); - if (result != GST_FLOW_OK) + if (G_UNLIKELY (buf == NULL)) + goto no_buffer; + + GST_PAD_PREROLL_LOCK (pad); + result = gst_base_sink_chain_unlocked (basesink, pad, buf); + GST_PAD_PREROLL_UNLOCK (pad); + if (G_UNLIKELY (result != GST_FLOW_OK)) goto paused; gst_object_unref (basesink); - /* default */ return; + /* ERRORS */ paused: { - gst_base_sink_event (pad, gst_event_new_eos ()); - gst_object_unref (basesink); + GST_LOG_OBJECT (basesink, "pausing task, reason %s", + gst_flow_get_name (result)); gst_pad_pause_task (pad); + /* fatal errors and NOT_LINKED cause EOS */ + if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) { + gst_base_sink_event (pad, gst_event_new_eos ()); + /* EOS does not cause an ERROR message */ + if (result != GST_FLOW_UNEXPECTED) { + GST_ELEMENT_ERROR (basesink, STREAM, FAILED, + (_("Internal data stream error.")), + ("stream stopped, reason %s", gst_flow_get_name (result))); + } + } + gst_object_unref (basesink); return; } +no_buffer: + { + GST_LOG_OBJECT (basesink, "no buffer, pausing"); + result = GST_FLOW_ERROR; + goto paused; + } } static gboolean @@ -1363,6 +1490,8 @@ gst_base_sink_activate_pull (GstPad * pad, gboolean active) * mode works just fine without having a newsegment before the * first buffer */ gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (basesink->abidata.ABI.clip_segment, + GST_FORMAT_UNDEFINED); basesink->have_newsegment = TRUE; /* set the pad mode before starting the task so that it's in the @@ -1397,6 +1526,7 @@ gst_base_sink_activate_pull (GstPad * pad, gboolean active) return result; } +/* send an event to our sinkpad peer. */ static gboolean gst_base_sink_send_event (GstElement * element, GstEvent * event) { @@ -1455,7 +1585,7 @@ gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format, goto wrong_state; /* and we need a clock */ - if ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL) + if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL)) goto wrong_state; /* collect all data we need holding the lock */ @@ -1572,32 +1702,40 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_READY_TO_PAUSED: /* need to complete preroll before this state change completes, there * is no data flow in READY so we can safely assume we need to preroll. */ - basesink->offset = 0; GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll"); + gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (basesink->abidata.ABI.clip_segment, + GST_FORMAT_UNDEFINED); + basesink->have_newsegment = FALSE; + basesink->offset = 0; basesink->have_preroll = FALSE; basesink->need_preroll = TRUE; - gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); - basesink->have_newsegment = FALSE; + basesink->playing_async = TRUE; + basesink->eos = FALSE; ret = GST_STATE_CHANGE_ASYNC; break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING"); GST_PAD_PREROLL_LOCK (basesink->sinkpad); - if (!gst_base_sink_is_prerolled (basesink)) { - ret = GST_STATE_CHANGE_ASYNC; - basesink->need_preroll = TRUE; - } else { + if (gst_base_sink_is_prerolled (basesink)) { /* no preroll needed anymore now. */ + GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll"); + basesink->playing_async = FALSE; basesink->need_preroll = FALSE; if (basesink->eos) { /* need to post EOS message here */ GST_DEBUG_OBJECT (basesink, "Now posting EOS"); gst_element_post_message (GST_ELEMENT_CAST (basesink), gst_message_new_eos (GST_OBJECT_CAST (basesink))); + } else { + GST_DEBUG_OBJECT (basesink, "signal preroll"); + GST_PAD_PREROLL_SIGNAL (basesink->sinkpad); } + } else { + GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, need preroll"); + basesink->need_preroll = TRUE; + basesink->playing_async = TRUE; + ret = GST_STATE_CHANGE_ASYNC; } - GST_DEBUG_OBJECT (basesink, "signal preroll"); - GST_PAD_PREROLL_SIGNAL (basesink->sinkpad); GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; default: @@ -1608,12 +1746,13 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) GstStateChangeReturn bret; bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - if (bret == GST_STATE_CHANGE_FAILURE) + if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE)) goto activate_failed; } switch (transition) { case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED"); GST_PAD_PREROLL_LOCK (basesink->sinkpad); basesink->need_preroll = TRUE; if (basesink->clock_id) { @@ -1623,11 +1762,13 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) if (bclass->unlock) bclass->unlock (basesink); - GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, waiting for lock"); /* if we don't have a preroll buffer we need to wait for a preroll and * return ASYNC. */ - if (!gst_base_sink_is_prerolled (basesink)) { + if (gst_base_sink_is_prerolled (basesink)) { + basesink->playing_async = FALSE; + } else { GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll"); + basesink->playing_async = TRUE; ret = GST_STATE_CHANGE_ASYNC; } GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); @@ -1637,7 +1778,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_READY_TO_NULL: if (bclass->stop) if (!bclass->stop (basesink)) { - GST_WARNING ("failed to stop"); + GST_WARNING_OBJECT (basesink, "failed to stop"); } break; default: diff --git a/libs/gst/base/gstbasesink.h b/libs/gst/base/gstbasesink.h index c0b4ed5954..a47e10455a 100644 --- a/libs/gst/base/gstbasesink.h +++ b/libs/gst/base/gstbasesink.h @@ -88,7 +88,15 @@ struct _GstBaseSink { gboolean flushing; /*< private >*/ - gpointer _gst_reserved[GST_PADDING_LARGE]; + union { + struct { + /* segment used for clipping incomming buffers */ + GstSegment *clip_segment; + } ABI; + /* adding + 0 to mark ABI change to be undone later */ + gpointer _gst_reserved[GST_PADDING_LARGE + 0]; + } abidata; + }; struct _GstBaseSinkClass { diff --git a/tests/check/elements/fakesink.c b/tests/check/elements/fakesink.c index a96dce01eb..0ca1a5203c 100644 --- a/tests/check/elements/fakesink.c +++ b/tests/check/elements/fakesink.c @@ -166,7 +166,7 @@ GST_START_TEST (test_clipping) fail_unless (current == GST_STATE_PAUSED); fail_unless (pending == GST_STATE_VOID_PENDING); - /* pause should render the buffer */ + /* playing should render the buffer */ ret = gst_element_set_state (sink, GST_STATE_PLAYING); fail_unless (ret == GST_STATE_CHANGE_SUCCESS); @@ -217,6 +217,81 @@ GST_START_TEST (test_clipping) GST_END_TEST; +GST_START_TEST (test_preroll_sync) +{ + GstElement *pipeline, *sink; + GstPad *sinkpad; + GstStateChangeReturn ret; + + /* create sink */ + pipeline = gst_pipeline_new ("pipeline"); + fail_if (pipeline == NULL); + + sink = gst_element_factory_make ("fakesink", "sink"); + fail_if (sink == NULL); + g_object_set (G_OBJECT (sink), "sync", TRUE, NULL); + + gst_bin_add (GST_BIN (pipeline), sink); + + sinkpad = gst_element_get_pad (sink, "sink"); + fail_if (sinkpad == NULL); + + /* make pipeline and element ready to accept data */ + ret = gst_element_set_state (pipeline, GST_STATE_PAUSED); + fail_unless (ret == GST_STATE_CHANGE_ASYNC); + + /* send segment */ + { + GstEvent *segment; + gboolean eret; + + GST_DEBUG ("sending segment"); + segment = gst_event_new_new_segment (FALSE, + 1.0, GST_FORMAT_TIME, 0 * GST_SECOND, 2 * GST_SECOND, 0 * GST_SECOND); + + eret = gst_pad_send_event (sinkpad, segment); + fail_if (eret == FALSE); + } + + /* send buffer that should block and finish preroll */ + { + GstBuffer *buffer; + GstFlowReturn fret; + ChainData *data; + GstState current, pending; + + buffer = gst_buffer_new (); + GST_BUFFER_TIMESTAMP (buffer) = 1 * GST_SECOND; + GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND; + + GST_DEBUG ("sending buffer to finish preroll"); + data = chain_async (sinkpad, buffer); + fail_if (data == NULL); + + /* state should now eventually change to PAUSED */ + ret = + gst_element_get_state (pipeline, ¤t, &pending, + GST_CLOCK_TIME_NONE); + fail_unless (ret == GST_STATE_CHANGE_SUCCESS); + fail_unless (current == GST_STATE_PAUSED); + fail_unless (pending == GST_STATE_VOID_PENDING); + + /* playing should render the buffer */ + ret = gst_element_set_state (pipeline, GST_STATE_PLAYING); + fail_unless (ret == GST_STATE_CHANGE_SUCCESS); + + /* and we should get a success return value */ + fret = chain_async_return (data); + fail_if (fret != GST_FLOW_OK); + } + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_element_get_state (pipeline, NULL, NULL, GST_CLOCK_TIME_NONE); + gst_object_unref (sinkpad); + gst_object_unref (pipeline); +} + +GST_END_TEST; + Suite * fakesink_suite (void) { @@ -225,6 +300,7 @@ fakesink_suite (void) suite_add_tcase (s, tc_chain); tcase_add_test (tc_chain, test_clipping); + tcase_add_test (tc_chain, test_preroll_sync); return s; }