diff --git a/ChangeLog b/ChangeLog index 55823eb580..0ddf614928 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,31 @@ +2006-02-02 Wim Taymans + + * docs/design/part-element-sink.txt: + Updated document. + + * libs/gst/base/gstbasesink.c: (gst_base_sink_init), + (gst_base_sink_finalize), (gst_base_sink_preroll_queue_flush), + (gst_base_sink_configure_segment), (gst_base_sink_commit_state), + (gst_base_sink_get_sync_times), (gst_base_sink_wait_clock), + (gst_base_sink_do_sync), (gst_base_sink_render_object), + (gst_base_sink_preroll_object), + (gst_base_sink_queue_object_unlocked), + (gst_base_sink_queue_object), (gst_base_sink_event), + (gst_base_sink_chain_unlocked), (gst_base_sink_chain), + (gst_base_sink_loop), (gst_base_sink_activate_pull), + (gst_base_sink_get_position), (gst_base_sink_change_state): + * libs/gst/base/gstbasesink.h: + Totally refactored matching the design doc. + Use two segments, one to clip incomming buffers and another to + perform sync. + Handle queueing correctly, bypass the queue when playing. + Make EOS cancelable. + Handle errors correctly when operating in pull based mode. + + * tests/check/elements/fakesink.c: (GST_START_TEST), + (fakesink_suite): + Added new check for sinks. + 2006-02-02 Wim Taymans * gst/gstsegment.c: (gst_segment_clip): diff --git a/docs/design/part-element-sink.txt b/docs/design/part-element-sink.txt index 686346354f..cf80eb0043 100644 --- a/docs/design/part-element-sink.txt +++ b/docs/design/part-element-sink.txt @@ -31,96 +31,170 @@ Events other than EOS do not complete the preroll stage. sink overview ------------- - /* commit the state. We return TRUE if we can continue - * streaming, FALSE in the case we go to a READY or NULL state. - * if we go to PLAYING, we don't need to block on preroll. - */ + - TODO: PREROLL_LOCK can be removed and we can safely use the STREAM_LOCK. + + + + # commit the state. We return TRUE if we can continue + # streaming, FALSE in the case we go to a READY or NULL state. + # if we go to PLAYING, we don't need to block on preroll. commit + { LOCK - switch (pending) { + switch (pending) case PLAYING: - need_preroll = FALSE; - break; + need_preroll = FALSE + break case PAUSED: - break; + break case READY: case NULL: - return FALSE; + return FALSE case VOID: - return TRUE; - } - /* update state */ - state = pending; - next = VOID; - pending = VOID; - UNLOCK - return TRUE; + return TRUE - /* handle a prerollable item (EOS or buffer). It is - * always called with the PREROLL_LOCK helt. - * need_preroll indicates that we must perform, commit and - * potentially block on preroll. - */ - handle (time) + # update state + state = pending + next = VOID + pending = VOID + UNLOCK + return TRUE + } + + # sync an object. We have to wait for the element to reach + # the PLAYING state before we can wait on the clock. + # some items do not need synchronisation (most events) so the + # get_times method returns FALSE (not syncable) + # need_preroll indicates that we are not in the PLAYING state + # and therefore need to commit and potentially block on preroll + # if our clock_wait got interrupted we commit and block again. + # The reason for this is that the current item being rendered is + # not yet finished and we can use that item to finish preroll. + do_sync (obj) + { + # get timing information for this object + syncable = get_times (obj, &start, &stop) + if (!syncable) + return OK; again: - while (need_preroll) { - preroll - if (!commit) - return WRONG_STATE - /* commit could have made us not need preroll anymore. */ - if (need_preroll) { - /* release PREROLL_LOCK and wait. prerolled can be observed - * and will be TRUE */ - prerolled = TRUE; + while (need_preroll) + if (need_commit) + need_commit = FALSE + if (!commit) + return WRONG_STATE + + if (need_preroll) + # release PREROLL_LOCK and wait. prerolled can be observed + # and will be TRUE + prerolled = TRUE PREROLL_WAIT (releasing PREROLL_LOCK) - prerolled = FALSE; + prerolled = FALSE if (flushing) return WRONG_STATE - } - } - if (clock && sync) { - /* the only way we can regain the prerolled state is when - * the clock entry gets unscheduled, we then preroll (again) on the - * current item, else we render and preroll on the next buffer. */ + + if (valid (start || stop)) PREROLL_UNLOCK - ret = wait_clock; + end_time = stop + ret = wait_clock (obj,start) PREROLL_LOCK - if (flushing) | /* sinks that sync on buffer contents do like this */ - return WRONG_STATE | while (more_to_render) { - if (ret == UNSCHEDULED) | ret = render - goto again; | if (ret == interrupted) - } | prerolled = TRUE; - render ----->| PREROLL_WAIT (releasing PREROLL_LOCK) - | prerolled = FALSE; + if (flushing) + return WRONG_STATE + # if the clock was unscheduled, we redo the + # preroll + if (ret == UNSCHEDULED) + goto again + } + + # render a prerollable item (EOS or buffer). It is + # always called with the PREROLL_LOCK helt. + render_object (obj) + { + ret = do_sync (obj) + if (ret != OK) + return ret; + + # preroll and syncing done, now we can render + render(obj) + } + | # sinks that sync on buffer contents do like this + | while (more_to_render) + | ret = render + | if (ret == interrupted) + | prerolled = TRUE + render (buffer) ----->| PREROLL_WAIT (releasing PREROLL_LOCK) + | prerolled = FALSE | if (flushing) | return WRONG_STATE - | } - /* various event functions */ + | + + # queue a prerollable item (EOS or buffer). It is + # always called with the PREROLL_LOCK helt. + # This function will commit the state when receiving the + # first prerollable item. + # items are then added to the rendering queue or rendered + # right away if no preroll is needed. + queue (obj, prerollable) + { + if (prerollable) + queuelen++ + + if (need_preroll) + # first item in the queue while we need preroll + # will complete state change and call preroll + if (queuelen == 1) + preroll (obj) + if (need_commit) + need_commit = FALSE + if (!commit) + return WRONG_STATE + + # then see if we need more preroll items before we + # can block + if (need_preroll) + if (queuelen <= maxqueue) + queue.add (obj) + return OK + + # now clear the queue and render each item before + # rendering the current item. + while (queue.hasItem) + render_object (queue.remove()) + + render_object (obj) + queuelen = 0 + } + + # various event functions event EOS: + # events must complete preroll too STREAM_LOCK PREROLL_LOCK if (flushing) return FALSE - ret = handle (end_time); + ret = queue (event, TRUE) if (ret == WRONG_STATE) return FALSE - post_eos - eos = TRUE; PREROLL_UNLOCK STREAM_UNLOCK - break; + break NEWSEGMENT: + # the newsegment must be used to clip incomming + # buffers. Then then go into the queue as non-prerollable + # items used for syncing the buffers STREAM_LOCK PREROLL_LOCK if (flushing) return FALSE set_clip - event + ret = queue (event, FALSE) + if (ret == WRONG_STATE) + return FALSE PREROLL_UNLOCK STREAM_UNLOCK - break; + break FLUSH_START: + # set flushing and unblock all that is waiting event ----> subclasses can interrupt render PREROLL_LOCK flushing = TRUE @@ -130,67 +204,89 @@ sink overview STREAM_LOCK lost_state STREAM_UNLOCK - break; + break FLUSH_END: + # unset flushing and clear all data and eos STREAM_LOCK event PREROLL_LOCK + queue.clear + queuelen = 0 flushing = FALSE - eos = FALSE; + eos = FALSE PREROLL_UNLOCK STREAM_UNLOCK - break; + break + # the chain function checks the buffer falls within the + # configured segment and queues the buffer for preroll and + # rendering chain STREAM_LOCK PREROLL_LOCK if (flushing) return WRONG_STATE if (clip) - handle (time); + queue (buffer, TRUE) PREROLL_UNLOCK STREAM_UNLOCK state switch (transition) READY_PAUSED: - ret = ASYNC; + # no datapassing is going on so we always return ASYNC + ret = ASYNC + need_commit = TRUE eos = FALSE flushing = FALSE - need_preroll = TRUE; - prerolled = FALSE; - break; + need_preroll = TRUE + prerolled = FALSE + break PAUSED_PLAYING: + # we grab the preroll lock. This we can only do if the + # chain function is either doing some clock sync, we are + # waiting for preroll or the chain function is not being called. PREROLL_LOCK if (prerolled || eos) - PREROLL_SIGNAL - ret = OK; - need_preroll = FALSE; + ret = OK + need_commit = FALSE + need_preroll = FALSE if (eos) post_eos + else + PREROLL_SIGNAL else - need_preroll = TRUE; - ret = ASYNC; + need_preroll = TRUE + need_commit = TRUE + ret = ASYNC PREROLL_UNLOCK - break; + break PLAYING_PAUSED: ---> subclass can interrupt render + # we grab the preroll lock. This we can only do if the + # chain function is either doing some clock sync + # or the chain function is not being called. PREROLL_LOCK - need_preroll = TRUE; + need_preroll = TRUE unlock_clock if (prerolled || eos) - ret = OK; + ret = OK else - ret = ASYNC; + ret = ASYNC PREROLL_UNLOCK - break; + break PAUSED_READY: ---> subclass can interrupt render + # we grab the preroll lock. Set to flushing and unlock + # everything. This should exit the chain functions and stop + # streaming. PREROLL_LOCK flushing = TRUE unlock_clock + queue.clear + queuelen = 0 PREROLL_SIGNAL - ret = OK; + ret = OK PREROLL_UNLOCK - break; + break diff --git a/libs/gst/base/gstbasesink.c b/libs/gst/base/gstbasesink.c index dfd14a8ab8..263f228e84 100644 --- a/libs/gst/base/gstbasesink.c +++ b/libs/gst/base/gstbasesink.c @@ -199,10 +199,6 @@ static gboolean gst_base_sink_activate (GstPad * pad); static gboolean gst_base_sink_activate_push (GstPad * pad, gboolean active); static gboolean gst_base_sink_activate_pull (GstPad * pad, gboolean active); static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event); -static inline GstFlowReturn gst_base_sink_handle_buffer (GstBaseSink * basesink, - GstBuffer * buf); -static inline gboolean gst_base_sink_handle_event (GstBaseSink * basesink, - GstEvent * event); static void gst_base_sink_base_init (gpointer g_class) @@ -341,10 +337,11 @@ gst_base_sink_init (GstBaseSink * basesink, gpointer g_class) GST_DEBUG_FUNCPTR (gst_base_sink_event)); gst_pad_set_chain_function (basesink->sinkpad, GST_DEBUG_FUNCPTR (gst_base_sink_chain)); - gst_element_add_pad (GST_ELEMENT (basesink), basesink->sinkpad); + gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad); basesink->pad_mode = GST_ACTIVATE_NONE; basesink->preroll_queue = g_queue_new (); + basesink->abidata.ABI.clip_segment = gst_segment_new (); basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH; basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL; @@ -362,6 +359,7 @@ gst_base_sink_finalize (GObject * object) basesink = GST_BASE_SINK (object); g_queue_free (basesink->preroll_queue); + gst_segment_free (basesink->abidata.ABI.clip_segment); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -433,50 +431,16 @@ gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size, return GST_FLOW_OK; } -/* with PREROLL_LOCK, STREAM_LOCK */ -static GstFlowReturn -gst_base_sink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad) -{ - GstMiniObject *obj; - GQueue *q = basesink->preroll_queue; - GstFlowReturn ret; - - ret = GST_FLOW_OK; - - if (q) { - GST_DEBUG_OBJECT (basesink, "emptying queue"); - while ((obj = g_queue_pop_head (q))) { - basesink->preroll_queued--; - - if (G_LIKELY (GST_IS_BUFFER (obj))) { - basesink->buffers_queued--; - GST_DEBUG_OBJECT (basesink, "popped buffer %p", obj); - ret = gst_base_sink_handle_buffer (basesink, GST_BUFFER_CAST (obj)); - } else { - basesink->events_queued--; - GST_DEBUG_OBJECT (basesink, "popped event %p", obj); - gst_base_sink_handle_event (basesink, GST_EVENT_CAST (obj)); - ret = GST_FLOW_OK; - } - } - GST_DEBUG_OBJECT (basesink, "queue empty"); - } - return ret; -} - /* with PREROLL_LOCK, STREAM_LOCK */ static void gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad) { GstMiniObject *obj; - GQueue *q = basesink->preroll_queue; GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink); - if (q) { - while ((obj = g_queue_pop_head (q))) { - GST_DEBUG_OBJECT (basesink, "popped %p", obj); - gst_mini_object_unref (obj); - } + while ((obj = g_queue_pop_head (basesink->preroll_queue))) { + GST_DEBUG_OBJECT (basesink, "popped %p", obj); + gst_mini_object_unref (obj); } /* we can't have EOS anymore now */ basesink->eos = FALSE; @@ -489,6 +453,36 @@ gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad) GST_PAD_PREROLL_SIGNAL (pad); } +/* with STREAM_LOCK, configures given segment with the event information. */ +static void +gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad, + GstEvent * event, GstSegment * segment) +{ + gboolean update; + gdouble rate; + GstFormat format; + gint64 start; + gint64 stop; + gint64 time; + + /* the newsegment event is needed to bring the buffer timestamps to the + * stream time and to drop samples outside of the playback segment. */ + gst_event_parse_new_segment (event, &update, &rate, &format, + &start, &stop, &time); + + GST_OBJECT_LOCK (basesink); + gst_segment_set_newsegment (segment, update, rate, format, start, stop, time); + + GST_DEBUG_OBJECT (basesink, + "configured NEWSEGMENT %" GST_TIME_FORMAT " -- %" + GST_TIME_FORMAT ", time %" GST_TIME_FORMAT ", accum %" + GST_TIME_FORMAT, + GST_TIME_ARGS (segment->start), + GST_TIME_ARGS (segment->stop), + GST_TIME_ARGS (segment->time), GST_TIME_ARGS (segment->accum)); + GST_OBJECT_UNLOCK (basesink); +} + /* with PREROLL_LOCK, STREAM_LOCK */ static gboolean gst_base_sink_commit_state (GstBaseSink * basesink) @@ -507,6 +501,7 @@ gst_base_sink_commit_state (GstBaseSink * basesink) switch (pending) { case GST_STATE_PLAYING: + GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING"); basesink->need_preroll = FALSE; post_playing = TRUE; /* post PAUSED too when we were READY */ @@ -515,7 +510,7 @@ gst_base_sink_commit_state (GstBaseSink * basesink) } break; case GST_STATE_PAUSED: - basesink->need_preroll = TRUE; + GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED"); post_paused = TRUE; post_pending = GST_STATE_VOID_PENDING; break; @@ -556,200 +551,513 @@ gst_base_sink_commit_state (GstBaseSink * basesink) nothing_pending: { + GST_DEBUG_OBJECT (basesink, "nothing to commit"); GST_OBJECT_UNLOCK (basesink); return TRUE; } stopping: { /* app is going to READY */ + GST_DEBUG_OBJECT (basesink, "stopping"); basesink->need_preroll = FALSE; + basesink->flushing = TRUE; GST_OBJECT_UNLOCK (basesink); return FALSE; } } -/* with STREAM_LOCK */ -static GstFlowReturn -gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad, - GstMiniObject * obj) + +/* with STREAM_LOCK, PREROLL_LOCK + * + * Returns TRUE if the object needs synchronisation and takes therefore + * part in prerolling. + * + * start and stop cannot be NULL. + */ +static gboolean +gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj, + GstClockTime * start, GstClockTime * stop) { - gint length; - gboolean have_buffer; - GstFlowReturn ret; + GstClockTime sstart, sstop; + gint64 cstart, cstop; + GstBaseSinkClass *bclass; + GstBuffer *buffer; - GST_PAD_PREROLL_LOCK (pad); - if (basesink->flushing) - goto in_flushing; - - /* push object on the queue */ - GST_DEBUG_OBJECT (basesink, "push %p on preroll_queue", obj); - g_queue_push_tail (basesink->preroll_queue, obj); - - have_buffer = GST_IS_BUFFER (obj); - if (G_LIKELY (have_buffer)) { - GstBuffer *buf = GST_BUFFER_CAST (obj); - GstClockTime start = -1, end = -1; - - if (!basesink->have_newsegment) { - GST_ELEMENT_WARNING (basesink, STREAM, FAILED, - (_("Internal data flow problem.")), - ("Received buffer without a new-segment. Cannot sync to clock.")); - basesink->have_newsegment = TRUE; - /* this means this sink will not be able to sync to the clock */ - basesink->segment.start = -1; - basesink->segment.stop = -1; - } - - /* check if the buffer needs to be dropped */ - /* we don't use the subclassed method as it may not return - * valid values for our purpose here */ - gst_base_sink_get_times (basesink, buf, &start, &end); - - GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT - ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end)); - - if (GST_CLOCK_TIME_IS_VALID (start) && - (basesink->segment.format == GST_FORMAT_TIME)) { - if (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME, - (gint64) start, (gint64) end, NULL, NULL)) - goto out_of_segment; - } - basesink->buffers_queued++; - basesink->preroll_queued++; - } else { + if (G_UNLIKELY (GST_IS_EVENT (obj))) { GstEvent *event = GST_EVENT_CAST (obj); switch (GST_EVENT_TYPE (event)) { - /* only EOS finishes preroll */ + /* EOS event needs syncing */ case GST_EVENT_EOS: - basesink->preroll_queued++; - basesink->eos_queued = TRUE; - break; + *start = basesink->end_time; + *stop = -1; + return TRUE; + /* other events do not need syncing */ + /* FIXME, maybe NEWSEGMENT might need synchronisation + * since the POSITION query depends on accumulated times + */ default: - break; - } - basesink->events_queued++; - } - - GST_DEBUG_OBJECT (basesink, - "now %d preroll, %d buffers, %d events on queue", - basesink->preroll_queued, - basesink->buffers_queued, basesink->events_queued); - - /* check if we are prerolling */ - if (G_LIKELY (!basesink->need_preroll)) - goto no_preroll; - - length = basesink->preroll_queued; - GST_DEBUG_OBJECT (basesink, "prerolled length %d", length); - - if (length == 1) { - GST_DEBUG_OBJECT (basesink, "do preroll %p", obj); - - /* if it's a buffer, we need to call the preroll method */ - if (G_LIKELY (have_buffer)) { - GstBaseSinkClass *bclass; - GstBuffer *buf = GST_BUFFER_CAST (obj); - - GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT, - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); - - bclass = GST_BASE_SINK_GET_CLASS (basesink); - if (bclass->preroll) - if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK) - goto preroll_failed; - } - - /* commit state */ - if (!gst_base_sink_commit_state (basesink)) - goto stopping; - - /* it is possible that commiting the state made us go to PLAYING - * now in which case we don't need to block anymore. */ - if (!basesink->need_preroll) { - goto no_preroll; + return FALSE; } } - /* see if we need to block now. */ - if (G_UNLIKELY (length <= basesink->preroll_queue_max_len)) - goto more_preroll; + /* else do buffer sync code */ + buffer = GST_BUFFER_CAST (obj); - /* block until the state changes, or we get a flush, or something */ - GST_DEBUG_OBJECT (basesink, "waiting to finish preroll"); - basesink->have_preroll = TRUE; - GST_PAD_PREROLL_WAIT (pad); - basesink->have_preroll = FALSE; - GST_DEBUG_OBJECT (basesink, "done preroll"); - if (G_UNLIKELY (basesink->flushing)) - goto flushing; + bclass = GST_BASE_SINK_GET_CLASS (basesink); - /* we can start rendering the data now */ -no_preroll: - GST_DEBUG_OBJECT (basesink, "no preroll needed"); - /* render all our buffers now */ - ret = gst_base_sink_preroll_queue_empty (basesink, pad); + sstart = sstop = -1; + if (bclass->get_times) + bclass->get_times (basesink, buffer, &sstart, &sstop); - GST_PAD_PREROLL_UNLOCK (pad); + GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT + ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (sstart), + GST_TIME_ARGS (sstop)); - return ret; + if (G_LIKELY (basesink->segment.format == GST_FORMAT_TIME)) { + /* clip */ + if (G_UNLIKELY (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME, + (gint64) sstart, (gint64) sstop, &cstart, &cstop))) + goto out_of_segment; + + if (G_UNLIKELY (sstart != cstart || sstop != cstop)) { + GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT + ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart), + GST_TIME_ARGS (cstop)); + } + + /* save last valid times seen. */ + if (GST_CLOCK_TIME_IS_VALID (cstop)) + gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, + (gint64) cstop); + else + gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, + (gint64) cstart); + } else { + if (basesink->segment.accum == 0) { + /* no clipping for formats different from GST_FORMAT_TIME */ + cstart = sstart; + cstop = sstop; + } else { + cstart = -1; + cstop = -1; + } + } + + *start = + gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cstart); + *stop = + gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cstop); + + /* buffers always need syncing and preroll */ + return TRUE; /* special cases */ out_of_segment: { - GstBuffer *buf; - - GST_DEBUG_OBJECT (basesink, "dropping buffer, out of segment"); - /* take the buffer off the queue again */ - buf = GST_BUFFER (g_queue_pop_tail (basesink->preroll_queue)); - - gst_buffer_unref (buf); - GST_PAD_PREROLL_UNLOCK (pad); - - return GST_FLOW_OK; + /* should not happen, we return FALSE so that we don't try to + * sync on it. */ + GST_ELEMENT_WARNING (basesink, STREAM, FAILED, + (NULL), ("unexpected buffer out of segment found.")); + GST_LOG_OBJECT (basesink, "buffer skipped, not in segment"); + return FALSE; } -in_flushing: - { - gst_mini_object_unref (obj); - GST_PAD_PREROLL_UNLOCK (pad); - GST_DEBUG_OBJECT (basesink, "sink is flushing"); +} - return GST_FLOW_WRONG_STATE; +/* with STREAM_LOCK, PREROLL_LOCK + * + * Waits for the clock to reach @time. If @time is not valid, no + * synchronisation is done. + * If synchronisation is disabled in the element or there is no + * clock, no synchronisation is done. + * Else a blocking wait is performed on the clock. We save the ClockID + * so we can unlock the entry at any time. While we are blocking, we + * release the PREROLL_LOCK so that other threads can interrupt the entry. + */ +static GstClockReturn +gst_base_sink_wait_clock (GstBaseSink * basesink, GstClockTime time) +{ + GstClockID id; + GstClockReturn ret; + GstClock *clock; + GstClockTime base_time; + + if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) + goto invalid_time; + + GST_OBJECT_LOCK (basesink); + if (G_UNLIKELY (!basesink->sync)) + goto no_sync; + + if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL)) + goto no_clock; + + base_time = GST_ELEMENT_CAST (basesink)->base_time; + id = gst_clock_new_single_shot_id (clock, base_time + time); + GST_OBJECT_UNLOCK (basesink); + + basesink->clock_id = id; + /* release the preroll lock while waiting */ + GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); + + ret = gst_clock_id_wait (id, NULL); + + GST_PAD_PREROLL_LOCK (basesink->sinkpad); + gst_clock_id_unref (id); + basesink->clock_id = NULL; + + return ret; + +invalid_time: + { + GST_DEBUG_OBJECT (basesink, "time not valid, no sync needed"); + return GST_CLOCK_OK; + } +no_sync: + { + GST_DEBUG_OBJECT (basesink, "sync disabled"); + GST_OBJECT_UNLOCK (basesink); + return GST_CLOCK_OK; + } +no_clock: + { + GST_DEBUG_OBJECT (basesink, "no clock, can't sync"); + GST_OBJECT_UNLOCK (basesink); + return GST_CLOCK_OK; + } +} + +/* with STREAM_LOCK, PREROLL_LOCK + * + * Make sure we are in PLAYING and synchronize an object to the clock. + * + * If we need preroll, we are not in PLAYING. We try to commit the state + * if needed and then block if we still are not PLAYING. + * + * We start waiting on the clock in PLAYING. If we got interrupted, we + * immediatly try to repreroll. + * + * Some objects do not need synchronisation (most events) and so this function + * immediatly returns GST_FLOW_OK. + * + * does not take ownership of obj. + */ +static GstFlowReturn +gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad, + GstMiniObject * obj) +{ + GstClockTime start, stop; + gboolean syncable; + GstClockReturn status = GST_CLOCK_OK; + + /* get timing information for this object */ + start = stop = -1; + syncable = gst_base_sink_get_sync_times (basesink, obj, &start, &stop); + + /* a syncable object needs to participate in preroll and + * clocking. All buffers and EOS are syncable. */ + if (G_UNLIKELY (!syncable)) + goto not_syncable; + +again: + /* first do preroll, this makes sure we commit our state + * to PAUSED and can continue to PLAYING. We cannot perform + * any clock sync in PAUSED because there is no clock. + */ + while (G_UNLIKELY (basesink->need_preroll)) { + if (G_LIKELY (basesink->playing_async)) { + basesink->playing_async = FALSE; + /* commit state */ + if (G_UNLIKELY (!gst_base_sink_commit_state (basesink))) + goto stopping; + } + + /* need to recheck here because the commit state could have + * made us not need the preroll anymore */ + if (G_LIKELY (basesink->need_preroll)) { + /* block until the state changes, or we get a flush, or something */ + GST_DEBUG_OBJECT (basesink, "waiting to finish preroll"); + basesink->have_preroll = TRUE; + GST_PAD_PREROLL_WAIT (pad); + basesink->have_preroll = FALSE; + GST_DEBUG_OBJECT (basesink, "done preroll"); + if (G_UNLIKELY (basesink->flushing)) + goto flushing; + } + } + + /* preroll done, we can sync since we ar in PLAYING now. */ + GST_DEBUG_OBJECT (basesink, "waiting for clock"); + basesink->end_time = stop; + status = gst_base_sink_wait_clock (basesink, start); + GST_DEBUG_OBJECT (basesink, "clock returned %d", status); + + /* waiting could be interrupted and we can be flushing now */ + if (G_UNLIKELY (basesink->flushing)) + goto flushing; + + /* check for unlocked by a state change, we are not flushing so + * we can try to preroll on the current buffer. */ + if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) { + GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more"); + goto again; + } + + /* FIXME, update clock stats here and do some QoS */ + + return GST_FLOW_OK; + + /* ERRORS */ +not_syncable: + { + GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj); + return GST_FLOW_OK; } flushing: { - gst_base_sink_preroll_queue_flush (basesink, pad); - GST_PAD_PREROLL_UNLOCK (pad); - GST_DEBUG_OBJECT (basesink, "sink is flushing"); - + GST_DEBUG_OBJECT (basesink, "we are flushing"); return GST_FLOW_WRONG_STATE; } stopping: { - GST_PAD_PREROLL_UNLOCK (pad); - GST_DEBUG_OBJECT (basesink, "stopping"); - + GST_DEBUG_OBJECT (basesink, "stopping while commiting state"); return GST_FLOW_WRONG_STATE; } +} + +/* with STREAM_LOCK, PREROLL_LOCK, + * + * Synchronize the object on the clock and then render it. + * + * takes ownership of obj. + */ +static GstFlowReturn +gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad, + GstMiniObject * obj) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstBaseSinkClass *bclass; + + /* synchronize this object */ + ret = gst_base_sink_do_sync (basesink, pad, obj); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto sync_failed; + + /* and now render */ + if (G_LIKELY (GST_IS_BUFFER (obj))) { + bclass = GST_BASE_SINK_GET_CLASS (basesink); + + GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj); + if (G_LIKELY (bclass->render)) + ret = bclass->render (basesink, GST_BUFFER_CAST (obj)); + } else { + GstEvent *event = GST_EVENT_CAST (obj); + gboolean ok = TRUE; + + bclass = GST_BASE_SINK_GET_CLASS (basesink); + + GST_DEBUG_OBJECT (basesink, "rendering event %p", obj); + if (bclass->event) + ok = bclass->event (basesink, event); + + if (G_LIKELY (ok)) { + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + /* the EOS event is completely handled so we mark + * ourselves as being in the EOS state. eos is also + * protected by the object lock so we can read it when + * answering the POSITION query. */ + GST_OBJECT_LOCK (basesink); + basesink->eos = TRUE; + GST_OBJECT_UNLOCK (basesink); + /* ok, now we can post the message */ + GST_DEBUG_OBJECT (basesink, "Now posting EOS"); + gst_element_post_message (GST_ELEMENT_CAST (basesink), + gst_message_new_eos (GST_OBJECT_CAST (basesink))); + break; + case GST_EVENT_NEWSEGMENT: + /* configure the segment */ + gst_base_sink_configure_segment (basesink, pad, event, + &basesink->segment); + default: + break; + } + } + } + + GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj); + gst_mini_object_unref (obj); + + return ret; + + /* ERRORS */ +sync_failed: + { + GST_DEBUG_OBJECT (basesink, "do_sync returned %s, unref object %p", + gst_flow_get_name (ret), obj); + gst_mini_object_unref (obj); + + return ret; + } +} + +/* with STREAM_LOCK, PREROLL_LOCK + * + * Perform preroll on the given object. For buffers this means + * calling the preroll subclass method. + * If that succeeds, the state will be commited. + * + * function does not take ownership of obj. + */ +static GstFlowReturn +gst_base_sink_preroll_object (GstBaseSink * basesink, GstPad * pad, + GstMiniObject * obj) +{ + GstFlowReturn ret; + + GST_DEBUG_OBJECT (basesink, "do preroll %p", obj); + + /* if it's a buffer, we need to call the preroll method */ + if (G_LIKELY (GST_IS_BUFFER (obj))) { + GstBaseSinkClass *bclass; + GstBuffer *buf = GST_BUFFER_CAST (obj); + + GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); + + bclass = GST_BASE_SINK_GET_CLASS (basesink); + if (bclass->preroll) + if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK) + goto preroll_failed; + } + + /* commit state */ + if (G_LIKELY (basesink->playing_async)) { + basesink->playing_async = FALSE; + if (G_UNLIKELY (!gst_base_sink_commit_state (basesink))) + goto stopping; + } + + return GST_FLOW_OK; + + /* ERRORS */ preroll_failed: { - GST_DEBUG_OBJECT (basesink, "preroll failed"); - gst_base_sink_preroll_queue_flush (basesink, pad); - GST_PAD_PREROLL_UNLOCK (pad); + GST_DEBUG_OBJECT (basesink, "preroll failed, abort state"); + gst_element_abort_state (GST_ELEMENT_CAST (basesink)); + return ret; + } +stopping: + { + GST_DEBUG_OBJECT (basesink, "stopping while commiting state"); + return GST_FLOW_WRONG_STATE; + } +} - GST_DEBUG_OBJECT (basesink, "abort state"); - gst_element_abort_state (GST_ELEMENT (basesink)); +/* with STREAM_LOCK, PREROLL_LOCK + * + * Queue an object for rendering. + * The first prerollable object queued will complete the preroll. If the + * preroll queue if filled, we render all the objects in the queue. + * + * This function takes ownership of the object. + */ +static GstFlowReturn +gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad, + GstMiniObject * obj, gboolean prerollable) +{ + GstFlowReturn ret = GST_FLOW_OK; + gint length; + if (G_LIKELY (prerollable)) + basesink->preroll_queued++; + + GST_DEBUG_OBJECT (basesink, "now %d prerolled items", + basesink->preroll_queued); + + if (G_UNLIKELY (basesink->need_preroll)) { + length = basesink->preroll_queued; + + if (length == 1) { + ret = gst_base_sink_preroll_object (basesink, pad, obj); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto preroll_failed; + } + /* need to recheck, commmit state during preroll could have made us + * not need more preroll. */ + if (G_UNLIKELY (basesink->need_preroll)) { + GQueue *q; + + /* see if we can render now. */ + if (G_UNLIKELY (length <= basesink->preroll_queue_max_len)) + goto more_preroll; + + /* we can start rendering (or blocking) */ + GST_DEBUG_OBJECT (basesink, "emptying queue"); + q = basesink->preroll_queue; + while (G_UNLIKELY (!g_queue_is_empty (q))) { + /* FIXME, do something with the return value? */ + ret = gst_base_sink_render_object (basesink, pad, g_queue_pop_head (q)); + } + } + } + + /* now render the object */ + ret = gst_base_sink_render_object (basesink, pad, obj); + basesink->preroll_queued = 0; + + return ret; + + /* special cases */ +preroll_failed: + { + GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s", + gst_flow_get_name (ret)); + gst_mini_object_unref (obj); return ret; } more_preroll: { - GST_DEBUG_OBJECT (basesink, "need more preroll data"); - GST_PAD_PREROLL_UNLOCK (pad); + /* add object to the queue and return */ + GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d", + length, basesink->preroll_queue_max_len); + g_queue_push_tail (basesink->preroll_queue, obj); return GST_FLOW_OK; } } +/* with STREAM_LOCK + * + * This function grabs the PREROLL_LOCK and adds the object to + * the queue. + * + * This function takes ownership of obj. + */ +static GstFlowReturn +gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad, + GstMiniObject * obj, gboolean prerollable) +{ + GstFlowReturn ret; + + GST_PAD_PREROLL_LOCK (pad); + if (G_UNLIKELY (basesink->flushing)) + goto flushing; + + ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable); + GST_PAD_PREROLL_UNLOCK (pad); + + return ret; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (basesink, "sink is flushing"); + GST_PAD_PREROLL_UNLOCK (pad); + gst_mini_object_unref (obj); + return GST_FLOW_WRONG_STATE; + } +} + static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event) { @@ -768,46 +1076,32 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) { GstFlowReturn ret; - /* EOS also finishes the preroll */ + /* EOS is a prerollable object */ ret = - gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT (event)); + gst_base_sink_queue_object (basesink, pad, + GST_MINI_OBJECT_CAST (event), TRUE); + + if (G_UNLIKELY (ret != GST_FLOW_OK)) + result = FALSE; break; } case GST_EVENT_NEWSEGMENT: { - /* the new segment is a non prerollable item and does not block anything, - * we can just configure the current segment and return. */ - /* FIXME, if the preroll-queue-len > 1 we need to queue the newsegment - * and activate them before attempting the synchronize */ - gboolean update; - gdouble rate; - GstFormat format; - gint64 start; - gint64 stop; - gint64 time; - - /* the newsegment event is needed to bring the buffer timestamps to the - * stream time and to drop samples outside of the playback segment. */ - gst_event_parse_new_segment (event, &update, &rate, &format, - &start, &stop, &time); + GstFlowReturn ret; basesink->have_newsegment = TRUE; - GST_OBJECT_LOCK (basesink); - gst_segment_set_newsegment (&basesink->segment, update, rate, format, - start, stop, time); + /* the new segment is a non prerollable item and does not block anything, + * we need to configure the current clipping segment and insert the event + * in the queue to serialize it with the buffers for rendering. */ + gst_base_sink_configure_segment (basesink, pad, event, + basesink->abidata.ABI.clip_segment); - GST_DEBUG_OBJECT (basesink, - "received NEWSEGMENT %" GST_TIME_FORMAT " -- %" - GST_TIME_FORMAT ", time %" GST_TIME_FORMAT ", accum %" - GST_TIME_FORMAT, - GST_TIME_ARGS (basesink->segment.start), - GST_TIME_ARGS (basesink->segment.stop), - GST_TIME_ARGS (basesink->segment.time), - GST_TIME_ARGS (basesink->segment.accum)); - GST_OBJECT_UNLOCK (basesink); - - gst_event_unref (event); + ret = + gst_base_sink_queue_object (basesink, pad, + GST_MINI_OBJECT_CAST (event), FALSE); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + result = FALSE; break; } case GST_EVENT_FLUSH_START: @@ -824,7 +1118,8 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) GST_PAD_STREAM_LOCK (pad); /* and we need to commit our state again on the next * prerolled buffer */ - gst_element_lost_state (GST_ELEMENT (basesink)); + basesink->playing_async = TRUE; + gst_element_lost_state (GST_ELEMENT_CAST (basesink)); GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event); GST_PAD_STREAM_UNLOCK (pad); @@ -839,6 +1134,8 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) /* we need new segment info after the flush. */ gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (basesink->abidata.ABI.clip_segment, + GST_FORMAT_UNDEFINED); basesink->have_newsegment = FALSE; GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event); @@ -848,8 +1145,8 @@ gst_base_sink_event (GstPad * pad, GstEvent * event) /* other events are sent to queue or subclass depending on if they * are serialized. */ if (GST_EVENT_IS_SERIALIZED (event)) { - gst_base_sink_handle_object (basesink, pad, - GST_MINI_OBJECT_CAST (event)); + gst_base_sink_queue_object (basesink, pad, + GST_MINI_OBJECT_CAST (event), FALSE); } else { if (bclass->event) bclass->event (basesink, event); @@ -895,273 +1192,76 @@ gst_base_sink_is_prerolled (GstBaseSink * basesink) return res; } -/* with STREAM_LOCK, PREROLL_LOCK, id should be a valid GstClockID */ -static GstClockReturn -gst_base_sink_wait (GstBaseSink * basesink, GstClockID id) -{ - GstClockReturn ret; - - basesink->clock_id = id; - /* release the preroll lock while waiting */ - GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); - - ret = gst_clock_id_wait (id, NULL); - - GST_PAD_PREROLL_LOCK (basesink->sinkpad); - gst_clock_id_unref (id); - basesink->clock_id = NULL; - - if (basesink->flushing) - ret = GST_CLOCK_UNSCHEDULED; - - return ret; -} - -/* perform synchronisation on a buffer +/* with STREAM_LOCK, PREROLL_LOCK * - * 1) check if we have a clock, if not, do nothing - * 2) calculate the start and end time of the buffer - * 3) create a single shot notification to wait on - * the clock, save the entry so we can unlock it - * 4) wait on the clock, this blocks - * 5) unref the clockid again + * Takes a buffer and compare the timestamps with the last segment. + * If the buffer falls outside of the segment boundaries, drop it. + * Else queue the buffer for preroll and rendering. * - * Is called with STREAM_LOCK + * This function takes ownership of the buffer. */ -static GstClockReturn -gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer) +static GstFlowReturn +gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad, + GstBuffer * buf) { - GstClockReturn result; - GstClockTime start, end; - gint64 cstart, cend; - GstBaseSinkClass *bclass; - GstClockTime base_time; - GstClock *clock; - GstClockID id; - GstClockTimeDiff stream_start, stream_end; + GstFlowReturn result; + GstClockTime start = -1, end = -1; - bclass = GST_BASE_SINK_GET_CLASS (basesink); + if (G_UNLIKELY (basesink->flushing)) + goto flushing; - start = end = -1; - if (bclass->get_times) - bclass->get_times (basesink, buffer, &start, &end); + if (G_UNLIKELY (!basesink->have_newsegment)) { + GST_ELEMENT_WARNING (basesink, STREAM, FAILED, + (_("Internal data flow problem.")), + ("Received buffer without a new-segment. Cannot sync to clock.")); + basesink->have_newsegment = TRUE; + /* this means this sink will not be able to sync to the clock */ + basesink->abidata.ABI.clip_segment->start = -1; + basesink->abidata.ABI.clip_segment->stop = -1; + basesink->segment.start = -1; + basesink->segment.stop = -1; + } + + /* check if the buffer needs to be dropped */ + /* we don't use the subclassed method as it may not return + * valid values for our purpose here */ + gst_base_sink_get_times (basesink, buf, &start, &end); GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end)); - /* if we don't have a timestamp, we don't sync */ - if (!GST_CLOCK_TIME_IS_VALID (start)) - goto invalid_start; - - if (basesink->segment.format == GST_FORMAT_TIME) { - /* clip */ - if (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME, - (gint64) start, (gint64) end, &cstart, &cend)) + if (GST_CLOCK_TIME_IS_VALID (start) && + (basesink->abidata.ABI.clip_segment->format == GST_FORMAT_TIME)) { + if (G_UNLIKELY (!gst_segment_clip (basesink->abidata.ABI.clip_segment, + GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL))) goto out_of_segment; - - if (start != cstart || end != cend) { - GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT - ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart), - GST_TIME_ARGS (cend)); - } - - /* save last valid times seen. */ - if (GST_CLOCK_TIME_IS_VALID (cend)) - gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, - (gint64) cend); - else - gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME, - (gint64) cstart); - } else { - /* no clipping for formats different from GST_FORMAT_TIME */ - cstart = start; - cend = end; } - if (!((basesink->segment.format == GST_FORMAT_TIME) - || (basesink->segment.accum == 0))) - goto no_segment; - - GST_OBJECT_LOCK (basesink); - if (!basesink->sync) - goto no_sync; - - if ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL) - goto no_clock; - - /* now do clocking, LOCK is helt */ - stream_start = - gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cstart); - - base_time = GST_ELEMENT_CAST (basesink)->base_time; - id = gst_clock_new_single_shot_id (clock, stream_start + base_time); - GST_OBJECT_UNLOCK (basesink); - - GST_LOG_OBJECT (basesink, - "waiting for clock, base time %" GST_TIME_FORMAT - " stream_start %" GST_TIME_FORMAT, - GST_TIME_ARGS (base_time), GST_TIME_ARGS (stream_start)); - - /* also save end_time of this buffer so that we can wait - * to signal EOS */ - stream_end = - gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cend); - - if (GST_CLOCK_TIME_IS_VALID (stream_end)) - basesink->end_time = stream_end + base_time; - else - basesink->end_time = GST_CLOCK_TIME_NONE; - - result = gst_base_sink_wait (basesink, id); - - GST_LOG_OBJECT (basesink, "clock entry done: %d", result); + /* now we can process the buffer in the queue, this function takes ownership + * of the buffer */ + result = + gst_base_sink_queue_object_unlocked (basesink, pad, + GST_MINI_OBJECT_CAST (buf), TRUE); return result; - /* special cases */ -invalid_start: + /* ERRORS */ +flushing: { - GST_DEBUG_OBJECT (basesink, "start not valid, cannot sync"); - return GST_CLOCK_OK; + GST_DEBUG_OBJECT (basesink, "sink is flushing"); + gst_buffer_unref (buf); + return GST_FLOW_WRONG_STATE; } out_of_segment: { - GST_LOG_OBJECT (basesink, "buffer skipped, not in segment"); - /* we return BADTIME to make callers drop this buffer. */ - return GST_CLOCK_BADTIME; - } -no_sync: - { - GST_DEBUG_OBJECT (basesink, "no need to sync"); - GST_OBJECT_UNLOCK (basesink); - return GST_CLOCK_OK; - } -no_clock: - { - GST_DEBUG_OBJECT (basesink, "no clock, can't sync"); - GST_OBJECT_UNLOCK (basesink); - return GST_CLOCK_OK; - } -no_segment: - { - GST_DEBUG_OBJECT (basesink, "no segment info, can't sync"); - return GST_CLOCK_OK; + GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment"); + gst_buffer_unref (buf); + return GST_FLOW_OK; } } - -/* handle an event - * - * 2) render the event - * 3) unref the event - * - * called with STREAM_LOCK, PREROLL_LOCK +/* with STREAM_LOCK */ -static inline gboolean -gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event) -{ - GstBaseSinkClass *bclass; - gboolean ret; - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_EOS: - { - GstClock *clock; - gboolean unlock = TRUE; - - GST_OBJECT_LOCK (basesink); - basesink->eos = TRUE; - basesink->eos_queued = FALSE; - if ((clock = GST_ELEMENT_CLOCK (basesink)) != NULL) { - /* wait for last buffer to finish if we have a valid end time */ - if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) { - GstClockID id; - - id = gst_clock_new_single_shot_id (clock, basesink->end_time); - unlock = FALSE; - GST_OBJECT_UNLOCK (basesink); - - gst_base_sink_wait (basesink, id); - basesink->end_time = GST_CLOCK_TIME_NONE; - } - } - if (unlock) - GST_OBJECT_UNLOCK (basesink); - break; - } - default: - break; - } - - bclass = GST_BASE_SINK_GET_CLASS (basesink); - if (bclass->event) - ret = bclass->event (basesink, event); - else - ret = TRUE; - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_EOS: - /* if we are still EOS, we can post the EOS message */ - if (basesink->eos) { - /* ok, now we can post the message */ - GST_DEBUG_OBJECT (basesink, "Now posting EOS"); - gst_element_post_message (GST_ELEMENT_CAST (basesink), - gst_message_new_eos (GST_OBJECT_CAST (basesink))); - } - break; - default: - break; - } - - GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event); - gst_event_unref (event); - - return ret; -} - -/* handle a buffer - * - * 1) first sync on the buffer - * 2) render the buffer - * 3) unref the buffer - * - * called with STREAM_LOCK, PREROLL_LOCK - */ -static inline GstFlowReturn -gst_base_sink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf) -{ - GstFlowReturn ret = GST_FLOW_OK; - GstClockReturn status; - - status = gst_base_sink_do_sync (basesink, buf); - switch (status) { - case GST_CLOCK_EARLY: - GST_DEBUG_OBJECT (basesink, "buffer too late!, rendering anyway"); - /* fallthrough for now */ - case GST_CLOCK_OK: - { - GstBaseSinkClass *bclass; - - bclass = GST_BASE_SINK_GET_CLASS (basesink); - if (bclass->render) - ret = bclass->render (basesink, buf); - break; - } - case GST_CLOCK_BADTIME: - /* when out of segment, should normally not happen */ - case GST_CLOCK_UNSCHEDULED: - /* unlocked by a state change */ - default: - GST_DEBUG_OBJECT (basesink, "clock returned %d, not rendering", status); - break; - } - - GST_DEBUG_OBJECT (basesink, "buffer unref after render %p", basesink, buf); - gst_buffer_unref (buf); - - return ret; -} - static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buf) { @@ -1170,11 +1270,12 @@ gst_base_sink_chain (GstPad * pad, GstBuffer * buf) basesink = GST_BASE_SINK (gst_pad_get_parent (pad)); - if (!(basesink->pad_mode == GST_ACTIVATE_PUSH)) + if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) goto wrong_mode; - result = - gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT_CAST (buf)); + GST_PAD_PREROLL_LOCK (pad); + result = gst_base_sink_chain_unlocked (basesink, pad, buf); + GST_PAD_PREROLL_UNLOCK (pad); done: gst_object_unref (basesink); @@ -1185,9 +1286,11 @@ done: wrong_mode: { GST_OBJECT_LOCK (pad); - g_warning ("Push on pad %s:%s, but it was not activated in push mode", + GST_WARNING_OBJECT (basesink, + "Push on pad %s:%s, but it was not activated in push mode", GST_DEBUG_PAD_NAME (pad)); GST_OBJECT_UNLOCK (pad); + gst_buffer_unref (buf); /* we don't post an error message this will signal to the peer * pushing that EOS is reached. */ result = GST_FLOW_UNEXPECTED; @@ -1195,6 +1298,8 @@ wrong_mode: } } +/* with STREAM_LOCK + */ static void gst_base_sink_loop (GstPad * pad) { @@ -1207,25 +1312,47 @@ gst_base_sink_loop (GstPad * pad) g_assert (basesink->pad_mode == GST_ACTIVATE_PULL); result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf); - if (result != GST_FLOW_OK) + if (G_UNLIKELY (result != GST_FLOW_OK)) goto paused; - result = gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT (buf)); - if (result != GST_FLOW_OK) + if (G_UNLIKELY (buf == NULL)) + goto no_buffer; + + GST_PAD_PREROLL_LOCK (pad); + result = gst_base_sink_chain_unlocked (basesink, pad, buf); + GST_PAD_PREROLL_UNLOCK (pad); + if (G_UNLIKELY (result != GST_FLOW_OK)) goto paused; gst_object_unref (basesink); - /* default */ return; + /* ERRORS */ paused: { - gst_base_sink_event (pad, gst_event_new_eos ()); - gst_object_unref (basesink); + GST_LOG_OBJECT (basesink, "pausing task, reason %s", + gst_flow_get_name (result)); gst_pad_pause_task (pad); + /* fatal errors and NOT_LINKED cause EOS */ + if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) { + gst_base_sink_event (pad, gst_event_new_eos ()); + /* EOS does not cause an ERROR message */ + if (result != GST_FLOW_UNEXPECTED) { + GST_ELEMENT_ERROR (basesink, STREAM, FAILED, + (_("Internal data stream error.")), + ("stream stopped, reason %s", gst_flow_get_name (result))); + } + } + gst_object_unref (basesink); return; } +no_buffer: + { + GST_LOG_OBJECT (basesink, "no buffer, pausing"); + result = GST_FLOW_ERROR; + goto paused; + } } static gboolean @@ -1363,6 +1490,8 @@ gst_base_sink_activate_pull (GstPad * pad, gboolean active) * mode works just fine without having a newsegment before the * first buffer */ gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (basesink->abidata.ABI.clip_segment, + GST_FORMAT_UNDEFINED); basesink->have_newsegment = TRUE; /* set the pad mode before starting the task so that it's in the @@ -1397,6 +1526,7 @@ gst_base_sink_activate_pull (GstPad * pad, gboolean active) return result; } +/* send an event to our sinkpad peer. */ static gboolean gst_base_sink_send_event (GstElement * element, GstEvent * event) { @@ -1455,7 +1585,7 @@ gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format, goto wrong_state; /* and we need a clock */ - if ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL) + if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL)) goto wrong_state; /* collect all data we need holding the lock */ @@ -1572,32 +1702,40 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_READY_TO_PAUSED: /* need to complete preroll before this state change completes, there * is no data flow in READY so we can safely assume we need to preroll. */ - basesink->offset = 0; GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll"); + gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (basesink->abidata.ABI.clip_segment, + GST_FORMAT_UNDEFINED); + basesink->have_newsegment = FALSE; + basesink->offset = 0; basesink->have_preroll = FALSE; basesink->need_preroll = TRUE; - gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED); - basesink->have_newsegment = FALSE; + basesink->playing_async = TRUE; + basesink->eos = FALSE; ret = GST_STATE_CHANGE_ASYNC; break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING"); GST_PAD_PREROLL_LOCK (basesink->sinkpad); - if (!gst_base_sink_is_prerolled (basesink)) { - ret = GST_STATE_CHANGE_ASYNC; - basesink->need_preroll = TRUE; - } else { + if (gst_base_sink_is_prerolled (basesink)) { /* no preroll needed anymore now. */ + GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll"); + basesink->playing_async = FALSE; basesink->need_preroll = FALSE; if (basesink->eos) { /* need to post EOS message here */ GST_DEBUG_OBJECT (basesink, "Now posting EOS"); gst_element_post_message (GST_ELEMENT_CAST (basesink), gst_message_new_eos (GST_OBJECT_CAST (basesink))); + } else { + GST_DEBUG_OBJECT (basesink, "signal preroll"); + GST_PAD_PREROLL_SIGNAL (basesink->sinkpad); } + } else { + GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, need preroll"); + basesink->need_preroll = TRUE; + basesink->playing_async = TRUE; + ret = GST_STATE_CHANGE_ASYNC; } - GST_DEBUG_OBJECT (basesink, "signal preroll"); - GST_PAD_PREROLL_SIGNAL (basesink->sinkpad); GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); break; default: @@ -1608,12 +1746,13 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) GstStateChangeReturn bret; bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - if (bret == GST_STATE_CHANGE_FAILURE) + if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE)) goto activate_failed; } switch (transition) { case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED"); GST_PAD_PREROLL_LOCK (basesink->sinkpad); basesink->need_preroll = TRUE; if (basesink->clock_id) { @@ -1623,11 +1762,13 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) if (bclass->unlock) bclass->unlock (basesink); - GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, waiting for lock"); /* if we don't have a preroll buffer we need to wait for a preroll and * return ASYNC. */ - if (!gst_base_sink_is_prerolled (basesink)) { + if (gst_base_sink_is_prerolled (basesink)) { + basesink->playing_async = FALSE; + } else { GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll"); + basesink->playing_async = TRUE; ret = GST_STATE_CHANGE_ASYNC; } GST_PAD_PREROLL_UNLOCK (basesink->sinkpad); @@ -1637,7 +1778,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_READY_TO_NULL: if (bclass->stop) if (!bclass->stop (basesink)) { - GST_WARNING ("failed to stop"); + GST_WARNING_OBJECT (basesink, "failed to stop"); } break; default: diff --git a/libs/gst/base/gstbasesink.h b/libs/gst/base/gstbasesink.h index c0b4ed5954..a47e10455a 100644 --- a/libs/gst/base/gstbasesink.h +++ b/libs/gst/base/gstbasesink.h @@ -88,7 +88,15 @@ struct _GstBaseSink { gboolean flushing; /*< private >*/ - gpointer _gst_reserved[GST_PADDING_LARGE]; + union { + struct { + /* segment used for clipping incomming buffers */ + GstSegment *clip_segment; + } ABI; + /* adding + 0 to mark ABI change to be undone later */ + gpointer _gst_reserved[GST_PADDING_LARGE + 0]; + } abidata; + }; struct _GstBaseSinkClass { diff --git a/tests/check/elements/fakesink.c b/tests/check/elements/fakesink.c index a96dce01eb..0ca1a5203c 100644 --- a/tests/check/elements/fakesink.c +++ b/tests/check/elements/fakesink.c @@ -166,7 +166,7 @@ GST_START_TEST (test_clipping) fail_unless (current == GST_STATE_PAUSED); fail_unless (pending == GST_STATE_VOID_PENDING); - /* pause should render the buffer */ + /* playing should render the buffer */ ret = gst_element_set_state (sink, GST_STATE_PLAYING); fail_unless (ret == GST_STATE_CHANGE_SUCCESS); @@ -217,6 +217,81 @@ GST_START_TEST (test_clipping) GST_END_TEST; +GST_START_TEST (test_preroll_sync) +{ + GstElement *pipeline, *sink; + GstPad *sinkpad; + GstStateChangeReturn ret; + + /* create sink */ + pipeline = gst_pipeline_new ("pipeline"); + fail_if (pipeline == NULL); + + sink = gst_element_factory_make ("fakesink", "sink"); + fail_if (sink == NULL); + g_object_set (G_OBJECT (sink), "sync", TRUE, NULL); + + gst_bin_add (GST_BIN (pipeline), sink); + + sinkpad = gst_element_get_pad (sink, "sink"); + fail_if (sinkpad == NULL); + + /* make pipeline and element ready to accept data */ + ret = gst_element_set_state (pipeline, GST_STATE_PAUSED); + fail_unless (ret == GST_STATE_CHANGE_ASYNC); + + /* send segment */ + { + GstEvent *segment; + gboolean eret; + + GST_DEBUG ("sending segment"); + segment = gst_event_new_new_segment (FALSE, + 1.0, GST_FORMAT_TIME, 0 * GST_SECOND, 2 * GST_SECOND, 0 * GST_SECOND); + + eret = gst_pad_send_event (sinkpad, segment); + fail_if (eret == FALSE); + } + + /* send buffer that should block and finish preroll */ + { + GstBuffer *buffer; + GstFlowReturn fret; + ChainData *data; + GstState current, pending; + + buffer = gst_buffer_new (); + GST_BUFFER_TIMESTAMP (buffer) = 1 * GST_SECOND; + GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND; + + GST_DEBUG ("sending buffer to finish preroll"); + data = chain_async (sinkpad, buffer); + fail_if (data == NULL); + + /* state should now eventually change to PAUSED */ + ret = + gst_element_get_state (pipeline, ¤t, &pending, + GST_CLOCK_TIME_NONE); + fail_unless (ret == GST_STATE_CHANGE_SUCCESS); + fail_unless (current == GST_STATE_PAUSED); + fail_unless (pending == GST_STATE_VOID_PENDING); + + /* playing should render the buffer */ + ret = gst_element_set_state (pipeline, GST_STATE_PLAYING); + fail_unless (ret == GST_STATE_CHANGE_SUCCESS); + + /* and we should get a success return value */ + fret = chain_async_return (data); + fail_if (fret != GST_FLOW_OK); + } + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_element_get_state (pipeline, NULL, NULL, GST_CLOCK_TIME_NONE); + gst_object_unref (sinkpad); + gst_object_unref (pipeline); +} + +GST_END_TEST; + Suite * fakesink_suite (void) { @@ -225,6 +300,7 @@ fakesink_suite (void) suite_add_tcase (s, tc_chain); tcase_add_test (tc_chain, test_clipping); + tcase_add_test (tc_chain, test_preroll_sync); return s; }