diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index a0e2c736c8..4e597235a5 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -204,59 +204,64 @@ enum #define GST_BASE_SRC_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate)) +/* The basesrc implementation need to respect the following locking order: + * 1. STREAM_LOCK + * 2. LIVE_LOCK + * 3. OBJECT_LOCK + */ struct _GstBaseSrcPrivate { - gboolean discont; - gboolean flushing; + gboolean discont; /* STREAM_LOCK */ + gboolean flushing; /* LIVE_LOCK */ - GstFlowReturn start_result; - gboolean async; + GstFlowReturn start_result; /* OBJECT_LOCK */ + gboolean async; /* OBJECT_LOCK */ /* if a stream-start event should be sent */ - gboolean stream_start_pending; + gboolean stream_start_pending; /* STREAM_LOCK */ /* if segment should be sent and a * seqnum if it was originated by a seek */ - gboolean segment_pending; - guint32 segment_seqnum; + gboolean segment_pending; /* OBJECT_LOCK */ + guint32 segment_seqnum; /* OBJECT_LOCK */ /* if EOS is pending (atomic) */ - GstEvent *pending_eos; - gint has_pending_eos; + GstEvent *pending_eos; /* OBJECT_LOCK */ + gint has_pending_eos; /* atomic */ /* if the eos was caused by a forced eos from the application */ - gboolean forced_eos; + gboolean forced_eos; /* LIVE_LOCK */ /* startup latency is the time it takes between going to PLAYING and producing * the first BUFFER with running_time 0. This value is included in the latency * reporting. */ - GstClockTime latency; + GstClockTime latency; /* OBJECT_LOCK */ /* timestamp offset, this is the offset add to the values of gst_times for * pseudo live sources */ - GstClockTimeDiff ts_offset; + GstClockTimeDiff ts_offset; /* OBJECT_LOCK */ - gboolean do_timestamp; - volatile gint dynamic_size; - volatile gint automatic_eos; + gboolean do_timestamp; /* OBJECT_LOCK */ + volatile gint dynamic_size; /* atomic */ + volatile gint automatic_eos; /* atomic */ /* stream sequence number */ - guint32 seqnum; + guint32 seqnum; /* STREAM_LOCK */ /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be * pushed in the data stream */ - GList *pending_events; - volatile gint have_events; + GList *pending_events; /* OBJECT_LOCK */ + volatile gint have_events; /* OBJECT_LOCK */ /* QoS *//* with LOCK */ - gboolean qos_enabled; - gdouble proportion; - GstClockTime earliest_time; + gboolean qos_enabled; /* unused */ + gdouble proportion; /* OBJECT_LOCK */ + GstClockTime earliest_time; /* OBJECT_LOCK */ - GstBufferPool *pool; - GstAllocator *allocator; - GstAllocationParams params; + GstBufferPool *pool; /* OBJECT_LOCK */ + GstAllocator *allocator; /* OBJECT_LOCK */ + GstAllocationParams params; /* OBJECT_LOCK */ - GCond async_cond; + GCond async_cond; /* OBJECT_LOCK */ }; static GstElementClass *parent_class = NULL; @@ -328,7 +333,7 @@ static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query); static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc, - gboolean flushing, gboolean live_play, gboolean * playing); + gboolean flushing); static gboolean gst_base_src_start (GstBaseSrc * basesrc); static gboolean gst_base_src_stop (GstBaseSrc * basesrc); @@ -484,6 +489,31 @@ gst_base_src_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } +/* Call with LIVE_LOCK held */ +static GstFlowReturn +gst_base_src_wait_playing_unlocked (GstBaseSrc * src) +{ + while (G_UNLIKELY (!src->live_running && !src->priv->flushing)) { + /* block until the state changes, or we get a flush, or something */ + GST_DEBUG_OBJECT (src, "live source waiting for running state"); + GST_LIVE_WAIT (src); + GST_DEBUG_OBJECT (src, "live source unlocked"); + } + + if (src->priv->flushing) + goto flushing; + + return GST_FLOW_OK; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (src, "we are flushing"); + return GST_FLOW_FLUSHING; + } +} + + /** * gst_base_src_wait_playing: * @src: the src @@ -503,25 +533,15 @@ gst_base_src_finalize (GObject * object) GstFlowReturn gst_base_src_wait_playing (GstBaseSrc * src) { + GstFlowReturn ret; + g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR); - do { - /* block until the state changes, or we get a flush, or something */ - GST_DEBUG_OBJECT (src, "live source waiting for running state"); - GST_LIVE_WAIT (src); - GST_DEBUG_OBJECT (src, "live source unlocked"); - if (src->priv->flushing) - goto flushing; - } while (G_UNLIKELY (!src->live_running)); + GST_LIVE_LOCK (src); + ret = gst_base_src_wait_playing_unlocked (src); + GST_LIVE_UNLOCK (src); - return GST_FLOW_OK; - - /* ERRORS */ -flushing: - { - GST_DEBUG_OBJECT (src, "we are flushing"); - return GST_FLOW_FLUSHING; - } + return ret; } /** @@ -855,6 +875,7 @@ gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop, return res; } +/* called with STREAM_LOCK */ static gboolean gst_base_src_send_stream_start (GstBaseSrc * src) { @@ -1577,7 +1598,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) GstSeekFlags flags; GstSeekType start_type, stop_type; gint64 start, stop; - gboolean flush, playing; + gboolean flush; gboolean update; gboolean relative_seek = FALSE; gboolean seekseg_configured = FALSE; @@ -1629,7 +1650,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) /* unblock streaming thread. */ if (unlock) - gst_base_src_set_flushing (src, TRUE, FALSE, &playing); + gst_base_src_set_flushing (src, TRUE); /* grab streaming lock, this should eventually be possible, either * because the task is paused, our streaming thread stopped @@ -1645,7 +1666,7 @@ gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) } if (unlock) - gst_base_src_set_flushing (src, FALSE, playing, NULL); + gst_base_src_set_flushing (src, FALSE); /* If we configured the seeksegment above, don't overwrite it now. Otherwise * copy the current segment info into the temp segment that we can actually @@ -1762,51 +1783,30 @@ gst_base_src_send_event (GstElement * element, GstEvent * event) /* bidirectional events */ case GST_EVENT_FLUSH_START: GST_DEBUG_OBJECT (src, "pushing flush-start event downstream"); - result = gst_pad_push_event (src->srcpad, event); - /* also unblock the create function */ - gst_base_src_activate_pool (src, FALSE); - /* unlock any subclasses, we need to do this before grabbing the - * LIVE_LOCK since we hold this lock before going into ::create. We pass an - * unlock to the params because of backwards compat (see seek handler)*/ - if (bclass->unlock) - bclass->unlock (src); - /* the live lock is released when we are blocked, waiting for playing or - * when we sync to the clock. */ - GST_LIVE_LOCK (src); - src->priv->flushing = TRUE; - /* clear pending EOS if any */ - if (g_atomic_int_get (&src->priv->has_pending_eos)) { - GST_OBJECT_LOCK (src); - CLEAR_PENDING_EOS (src); - src->priv->forced_eos = FALSE; - GST_OBJECT_UNLOCK (src); - } - if (bclass->unlock_stop) - bclass->unlock_stop (src); - if (src->clock_id) - gst_clock_id_unschedule (src->clock_id); - GST_DEBUG_OBJECT (src, "signal"); - GST_LIVE_SIGNAL (src); - GST_LIVE_UNLOCK (src); + result = gst_pad_push_event (src->srcpad, event); + gst_base_src_set_flushing (src, TRUE); event = NULL; break; case GST_EVENT_FLUSH_STOP: { gboolean start; - GST_LIVE_LOCK (src); - src->priv->segment_pending = TRUE; - src->priv->flushing = FALSE; + GST_PAD_STREAM_LOCK (src->srcpad); + gst_base_src_set_flushing (src, FALSE); + GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream"); result = gst_pad_push_event (src->srcpad, event); - gst_base_src_activate_pool (src, TRUE); + /* For external flush, restart the task .. */ + GST_LIVE_LOCK (src); + src->priv->segment_pending = TRUE; GST_OBJECT_LOCK (src->srcpad); start = (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH); GST_OBJECT_UNLOCK (src->srcpad); + /* ... and for live sources, only if in playing state */ if (src->is_live) { if (!src->live_running) start = FALSE; @@ -1815,7 +1815,10 @@ gst_base_src_send_event (GstElement * element, GstEvent * event) if (start) gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, src->srcpad, NULL); + GST_LIVE_UNLOCK (src); + GST_PAD_STREAM_UNLOCK (src->srcpad); + event = NULL; break; } @@ -1823,47 +1826,68 @@ gst_base_src_send_event (GstElement * element, GstEvent * event) /* downstream serialized events */ case GST_EVENT_EOS: { + gboolean push_mode; + /* queue EOS and make sure the task or pull function performs the EOS * actions. * - * We have two possibilities: + * For push mode, This will be done in 3 steps. It is required to not + * block here as gst_element_send_event() will hold the STATE_LOCK, hence + * blocking would prevent asynchronous state change to complete. * - * - Before we are to enter the _create function, we check the has_pending_eos - * first and do EOS instead of entering it. - * - If we are in the _create function or we did not manage to set the - * flag fast enough and we are about to enter the _create function, - * we unlock it so that we exit with FLUSHING immediately. We then - * check the EOS flag and do the EOS logic. + * 1. We stop the streaming thread + * 2. We set the pending eos + * 3. We start the streaming thread again, so it is performed + * asynchronously. + * + * For pull mode, we simply mark the pending EOS without flushing. */ - GST_OBJECT_LOCK (src); - g_atomic_int_set (&src->priv->has_pending_eos, TRUE); - if (src->priv->pending_eos) - gst_event_unref (src->priv->pending_eos); - src->priv->pending_eos = event; + + GST_OBJECT_LOCK (src->srcpad); + push_mode = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH; + GST_OBJECT_UNLOCK (src->srcpad); + + if (push_mode) { + gst_base_src_set_flushing (src, TRUE); + + GST_PAD_STREAM_LOCK (src->srcpad); + gst_base_src_set_flushing (src, FALSE); + + GST_OBJECT_LOCK (src); + g_atomic_int_set (&src->priv->has_pending_eos, TRUE); + if (src->priv->pending_eos) + gst_event_unref (src->priv->pending_eos); + src->priv->pending_eos = event; + GST_OBJECT_UNLOCK (src); + + GST_DEBUG_OBJECT (src, + "EOS marked, start task for asynchronous handling"); + gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, + src->srcpad, NULL); + + GST_PAD_STREAM_UNLOCK (src->srcpad); + } else { + /* In pull mode, we need not to return flushing to downstream, though + * the stream lock is not kept after getrange was unblocked */ + GST_OBJECT_LOCK (src); + g_atomic_int_set (&src->priv->has_pending_eos, TRUE); + if (src->priv->pending_eos) + gst_event_unref (src->priv->pending_eos); + src->priv->pending_eos = event; + GST_OBJECT_UNLOCK (src); + + gst_base_src_activate_pool (src, FALSE); + if (bclass->unlock) + bclass->unlock (src); + + GST_PAD_STREAM_LOCK (src->srcpad); + if (bclass->unlock_stop) + bclass->unlock_stop (src); + GST_PAD_STREAM_UNLOCK (src->srcpad); + } + + event = NULL; - GST_OBJECT_UNLOCK (src); - - GST_DEBUG_OBJECT (src, "EOS marked, calling unlock"); - - /* unlock the _create function so that we can check the has_pending_eos flag - * and we can do EOS. This will eventually release the LIVE_LOCK again so - * that we can grab it and stop the unlock again. We don't take the stream - * lock so that this operation is guaranteed to never block. */ - gst_base_src_activate_pool (src, FALSE); - if (bclass->unlock) - bclass->unlock (src); - - GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK"); - - GST_LIVE_LOCK (src); - GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop"); - /* now stop the unlock of the streaming thread again. Grabbing the live - * lock is enough because that protects the create function. */ - if (bclass->unlock_stop) - bclass->unlock_stop (src); - gst_base_src_activate_pool (src, TRUE); - GST_LIVE_UNLOCK (src); - result = TRUE; break; } @@ -2005,10 +2029,10 @@ gst_base_src_default_event (GstBaseSrc * src, GstEvent * event) case GST_EVENT_FLUSH_START: /* cancel any blocking getrange, is normally called * when in pull mode. */ - result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL); + result = gst_base_src_set_flushing (src, TRUE); break; case GST_EVENT_FLUSH_STOP: - result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL); + result = gst_base_src_set_flushing (src, FALSE); break; case GST_EVENT_QOS: { @@ -2427,7 +2451,7 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length, again: if (src->is_live) { if (G_UNLIKELY (!src->live_running)) { - ret = gst_base_src_wait_playing (src); + ret = gst_base_src_wait_playing_unlocked (src); if (ret != GST_FLOW_OK) goto stopped; } @@ -2470,7 +2494,23 @@ again: res_buf = in_buf = *buf; + GST_LIVE_UNLOCK (src); ret = bclass->create (src, offset, length, &res_buf); + GST_LIVE_LOCK (src); + + /* As we released the LIVE_LOCK, the state may have changed */ + if (src->is_live) { + if (G_UNLIKELY (!src->live_running)) { + GstFlowReturn wait_ret; + wait_ret = gst_base_src_wait_playing_unlocked (src); + if (wait_ret != GST_FLOW_OK) { + if (ret == GST_FLOW_OK && *buf == NULL) + gst_buffer_unref (res_buf); + ret = wait_ret; + goto stopped; + } + } + } /* The create function could be unlocked because we have a pending EOS. It's * possible that we have a valid buffer from create that we need to @@ -2677,6 +2717,7 @@ start_failed: } } +/* Called with STREAM_LOCK */ static void gst_base_src_loop (GstPad * pad) { @@ -2698,6 +2739,14 @@ gst_base_src_loop (GstPad * pad) goto flushing; GST_LIVE_UNLOCK (src); + /* Just return if EOS is pushed again, as the app might be unaware that an + * EOS have been sent already */ + if (GST_PAD_IS_EOS (pad)) { + GST_DEBUG_OBJECT (src, "Pad is marked as EOS, pause the task"); + gst_pad_pause_task (pad); + goto done; + } + gst_base_src_send_stream_start (src); /* The stream-start event could've caused something to flush us */ @@ -3427,7 +3476,7 @@ gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret) /* stop flushing now but for live sources, still block in the LIVE lock when * we are not yet PLAYING */ - gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL); + gst_base_src_set_flushing (basesrc, FALSE); gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc)); @@ -3546,7 +3595,7 @@ gst_base_src_stop (GstBaseSrc * basesrc) GST_DEBUG_OBJECT (basesrc, "stopping source"); /* flush all */ - gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL); + gst_base_src_set_flushing (basesrc, TRUE); /* stop the task */ gst_pad_stop_task (basesrc->srcpad); @@ -3579,34 +3628,26 @@ was_stopped: /* start or stop flushing dataprocessing */ static gboolean -gst_base_src_set_flushing (GstBaseSrc * basesrc, - gboolean flushing, gboolean live_play, gboolean * playing) +gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing) { GstBaseSrcClass *bclass; bclass = GST_BASE_SRC_GET_CLASS (basesrc); - GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play); + GST_DEBUG_OBJECT (basesrc, "flushing %d", flushing); if (flushing) { gst_base_src_activate_pool (basesrc, FALSE); - /* unlock any subclasses, we need to do this before grabbing the - * LIVE_LOCK since we hold this lock before going into ::create. We pass an - * unlock to the params because of backwards compat (see seek handler)*/ + /* unlock any subclasses to allow turning off the streaming thread */ if (bclass->unlock) bclass->unlock (basesrc); } - /* the live lock is released when we are blocked, waiting for playing or - * when we sync to the clock. */ + /* the live lock is released when we are blocked, waiting for playing, + * when we sync to the clock or creating a buffer */ GST_LIVE_LOCK (basesrc); - if (playing) - *playing = basesrc->live_running; basesrc->priv->flushing = flushing; if (flushing) { - /* if we are locked in the live lock, signal it to make it flush */ - basesrc->live_running = TRUE; - /* clear pending EOS if any */ if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) { GST_OBJECT_LOCK (basesrc); @@ -3615,17 +3656,10 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc, GST_OBJECT_UNLOCK (basesrc); } - /* step 1, now that we have the LIVE lock, clear our unlock request */ - if (bclass->unlock_stop) - bclass->unlock_stop (basesrc); - - /* step 2, unblock clock sync (if any) or any other blocking thing */ + /* unblock clock sync (if any) or any other blocking thing */ if (basesrc->clock_id) gst_clock_id_unschedule (basesrc->clock_id); } else { - /* signal the live source that it can start playing */ - basesrc->live_running = live_play; - gst_base_src_activate_pool (basesrc, TRUE); /* Drop all delayed events */ @@ -3639,9 +3673,18 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc, } GST_OBJECT_UNLOCK (basesrc); } + GST_LIVE_SIGNAL (basesrc); GST_LIVE_UNLOCK (basesrc); + if (!flushing) { + /* Now wait for the stream lock to be released and clear our unlock request */ + GST_PAD_STREAM_LOCK (basesrc->srcpad); + if (bclass->unlock_stop) + bclass->unlock_stop (basesrc); + GST_PAD_STREAM_UNLOCK (basesrc->srcpad); + } + return TRUE; } @@ -3650,17 +3693,6 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc, static gboolean gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play) { - GstBaseSrcClass *bclass; - - bclass = GST_BASE_SRC_GET_CLASS (basesrc); - - /* unlock subclasses locked in ::create, we only do this when we stop playing. */ - if (!live_play) { - GST_DEBUG_OBJECT (basesrc, "unlock"); - if (bclass->unlock) - bclass->unlock (basesrc); - } - /* we are now able to grab the LIVE lock, when we get it, we can be * waiting for PLAYING while blocked in the LIVE cond or we can be waiting * for the clock. */ @@ -3678,11 +3710,6 @@ gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play) if (live_play) { gboolean start; - /* clear our unlock request when going to PLAYING */ - GST_DEBUG_OBJECT (basesrc, "unlock stop"); - if (bclass->unlock_stop) - bclass->unlock_stop (basesrc); - /* for live sources we restart the timestamp correction */ GST_OBJECT_LOCK (basesrc); basesrc->priv->latency = -1; @@ -3840,7 +3867,7 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PLAYING_TO_PAUSED: GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED"); if (gst_base_src_is_live (basesrc)) { - /* make sure we block in the live lock in PAUSED */ + /* make sure we block in the live cond in PAUSED */ gst_base_src_set_playing (basesrc, FALSE); no_preroll = TRUE; }