diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index 3d60e5c841..0386a7fb09 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -213,6 +213,9 @@ struct _GstBaseSrcPrivate gboolean discont; gboolean flushing; + GstFlowReturn start_result; + gboolean async; + /* if segment should be sent */ gboolean segment_pending; @@ -317,6 +320,7 @@ static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc, static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing); + static gboolean gst_base_src_start (GstBaseSrc * basesrc); static gboolean gst_base_src_stop (GstBaseSrc * basesrc); @@ -435,7 +439,9 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP; g_atomic_int_set (&basesrc->priv->have_events, FALSE); - GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED); + basesrc->priv->start_result = GST_FLOW_WRONG_STATE; + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED); + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE); GST_DEBUG_OBJECT (basesrc, "init done"); @@ -597,6 +603,49 @@ gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic) g_atomic_int_set (&src->priv->dynamic_size, dynamic); } +/** + * gst_base_src_set_async: + * @src: base source instance + * @async: new async mode + * + * Configure async behaviour in @src, no state change will block. The open, + * close, start, stop, play and pause virtual methods will be executed in a + * different thread and are thus allowed to perform blocking operations. Any + * blocking operation should be unblocked with the unlock vmethod. + */ +void +gst_base_src_set_async (GstBaseSrc * src, gboolean async) +{ + g_return_if_fail (GST_IS_BASE_SRC (src)); + + GST_OBJECT_LOCK (src); + src->priv->async = async; + GST_OBJECT_UNLOCK (src); +} + +/** + * gst_base_src_is_async: + * @src: base source instance + * + * Get the current async behaviour of @src. See also gst_base_src_set_async(). + * + * Returns: %TRUE if @src is operating in async mode. + */ +gboolean +gst_base_src_is_async (GstBaseSrc * src) +{ + gboolean res; + + g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE); + + GST_OBJECT_LOCK (src); + res = src->priv->async; + GST_OBJECT_UNLOCK (src); + + return res; +} + + /** * gst_base_src_query_latency: * @src: the source @@ -2179,7 +2228,7 @@ again: } } - if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED))) + if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src))) goto not_started; if (G_UNLIKELY (!bclass->create)) @@ -2364,13 +2413,23 @@ static gboolean gst_base_src_is_random_access (GstBaseSrc * src) { /* we need to start the basesrc to check random access */ - if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) { + if (!GST_BASE_SRC_IS_STARTED (src)) { GST_LOG_OBJECT (src, "doing start/stop to check get_range support"); - if (G_LIKELY (gst_base_src_start (src))) + if (G_LIKELY (gst_base_src_start (src))) { + if (gst_base_src_start_wait (src) != GST_FLOW_OK) + goto start_failed; gst_base_src_stop (src); + } } return src->random_access; + + /* ERRORS */ +start_failed: + { + GST_DEBUG_OBJECT (src, "failed to start"); + return FALSE; + } } static void @@ -2840,24 +2899,23 @@ static gboolean gst_base_src_start (GstBaseSrc * basesrc) { GstBaseSrcClass *bclass; - gboolean result, have_size; - guint64 size; - gboolean seekable; - GstFormat format; + gboolean result; - if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED)) - return TRUE; - - GST_DEBUG_OBJECT (basesrc, "starting source"); + GST_LIVE_LOCK (basesrc); + if (GST_BASE_SRC_IS_STARTING (basesrc)) + goto was_starting; + if (GST_BASE_SRC_IS_STARTED (basesrc)) + goto was_started; + basesrc->priv->start_result = GST_FLOW_WRONG_STATE; + GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING); basesrc->num_buffers_left = basesrc->num_buffers; - + basesrc->running = FALSE; + basesrc->priv->segment_pending = FALSE; GST_OBJECT_LOCK (basesrc); gst_segment_init (&basesrc->segment, basesrc->segment.format); GST_OBJECT_UNLOCK (basesrc); - - basesrc->running = FALSE; - basesrc->priv->segment_pending = FALSE; + GST_LIVE_UNLOCK (basesrc); bclass = GST_BASE_SRC_GET_CLASS (basesrc); if (bclass->start) @@ -2868,14 +2926,68 @@ gst_base_src_start (GstBaseSrc * basesrc) if (!result) goto could_not_start; - GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED); + if (!gst_base_src_is_async (basesrc)) + gst_base_src_start_complete (basesrc, GST_FLOW_OK); + return result; + + /* ERROR */ +was_starting: + { + GST_DEBUG_OBJECT (basesrc, "was starting"); + GST_LIVE_UNLOCK (basesrc); + return TRUE; + } +was_started: + { + GST_DEBUG_OBJECT (basesrc, "was started"); + GST_LIVE_UNLOCK (basesrc); + return TRUE; + } +could_not_start: + { + GST_DEBUG_OBJECT (basesrc, "could not start"); + /* subclass is supposed to post a message. We don't have to call _stop. */ + gst_base_src_start_complete (basesrc, GST_FLOW_ERROR); + return FALSE; + } +} + +/** + * gst_base_src_start_complete: + * @src: base source instance + * @ret: a #GstFlowReturn + * + * Complete an asynchronous start operation. When the subclass overrides the + * start method, it should call gst_base_src_start_complete() when the start + * operation completes either from the same thread or from an asynchronous + * helper thread. + */ +void +gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret) +{ + gboolean have_size; + guint64 size; + gboolean seekable; + GstFormat format; + GstPadMode mode; + GstEvent *event; + GstBaseSrcClass *bclass; + + bclass = GST_BASE_SRC_GET_CLASS (basesrc); + + if (ret != GST_FLOW_OK) + goto error; + + GST_DEBUG_OBJECT (basesrc, "starting source"); format = basesrc->segment.format; /* figure out the size */ have_size = FALSE; size = -1; if (format == GST_FORMAT_BYTES) { + bclass = GST_BASE_SRC_GET_CLASS (basesrc); + if (bclass->get_size) { if (!(have_size = bclass->get_size (basesrc, &size))) size = -1; @@ -2901,16 +3013,107 @@ gst_base_src_start (GstBaseSrc * basesrc) GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access); + /* stop flushing now but for live sources, still block in the LIVE lock when + * we are not yet PLAYING */ + gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL); + + GST_OBJECT_LOCK (basesrc->srcpad); + mode = GST_PAD_MODE (basesrc->srcpad); + GST_OBJECT_UNLOCK (basesrc->srcpad); + + if (mode == GST_PAD_MODE_PUSH) { + /* do initial seek, which will start the task */ + GST_OBJECT_LOCK (basesrc); + event = basesrc->pending_seek; + basesrc->pending_seek = NULL; + GST_OBJECT_UNLOCK (basesrc); + + /* no need to unlock anything, the task is certainly + * not running here. The perform seek code will start the task when + * finished. */ + if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE))) + goto seek_failed; + + if (event) + gst_event_unref (event); + } else { + /* if not random_access, we cannot operate in pull mode for now */ + if (G_UNLIKELY (!basesrc->random_access)) + goto no_get_range; + } + gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc)); - return TRUE; + GST_LIVE_LOCK (basesrc); + GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED); + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); + basesrc->priv->start_result = ret; + GST_LIVE_SIGNAL (basesrc); + GST_LIVE_UNLOCK (basesrc); - /* ERROR */ -could_not_start: + return; + +seek_failed: { - GST_DEBUG_OBJECT (basesrc, "could not start"); - /* subclass is supposed to post a message. We don't have to call _stop. */ - return FALSE; + GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek"); + gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); + if (event) + gst_event_unref (event); + ret = GST_FLOW_ERROR; + goto error; + } +no_get_range: + { + gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); + GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping"); + ret = GST_FLOW_ERROR; + goto error; + } +error: + { + GST_LIVE_LOCK (basesrc); + basesrc->priv->start_result = ret; + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); + GST_LIVE_SIGNAL (basesrc); + GST_LIVE_UNLOCK (basesrc); + return; + } +} + +/** + * gst_base_src_start_complete: + * @src: base source instance + * @ret: a #GstFlowReturn + * + * Wait until the start operation completes. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +gst_base_src_start_wait (GstBaseSrc * basesrc) +{ + GstFlowReturn result; + + GST_LIVE_LOCK (basesrc); + if (G_UNLIKELY (basesrc->priv->flushing)) + goto flushing; + + while (GST_BASE_SRC_IS_STARTING (basesrc)) { + GST_LIVE_WAIT (basesrc); + if (G_UNLIKELY (basesrc->priv->flushing)) + goto flushing; + } + result = basesrc->priv->start_result; + GST_LIVE_UNLOCK (basesrc); + + return result; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (basesrc, "we are flushing"); + GST_LIVE_UNLOCK (basesrc); + return GST_FLOW_WRONG_STATE; } } @@ -2920,21 +3123,37 @@ gst_base_src_stop (GstBaseSrc * basesrc) GstBaseSrcClass *bclass; gboolean result = TRUE; - if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED)) - return TRUE; - GST_DEBUG_OBJECT (basesrc, "stopping source"); + /* flush all */ + gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); + /* stop the task */ + gst_pad_stop_task (basesrc->srcpad); + + GST_LIVE_LOCK (basesrc); + if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc)) + goto was_stopped; + + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING); + GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED); + basesrc->priv->start_result = GST_FLOW_WRONG_STATE; + GST_LIVE_SIGNAL (basesrc); + GST_LIVE_UNLOCK (basesrc); + bclass = GST_BASE_SRC_GET_CLASS (basesrc); if (bclass->stop) result = bclass->stop (basesrc); gst_base_src_set_allocation (basesrc, NULL, NULL, 0, 0); - if (result) - GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED); - return result; + +was_stopped: + { + GST_DEBUG_OBJECT (basesrc, "was started"); + GST_LIVE_UNLOCK (basesrc); + return TRUE; + } } /* start or stop flushing dataprocessing @@ -3061,7 +3280,6 @@ static gboolean gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active) { GstBaseSrc *basesrc; - GstEvent *event; basesrc = GST_BASE_SRC (parent); @@ -3074,30 +3292,8 @@ gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active) if (G_UNLIKELY (!gst_base_src_start (basesrc))) goto error_start; - - basesrc->priv->discont = TRUE; - gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL); - - /* do initial seek, which will start the task */ - GST_OBJECT_LOCK (basesrc); - event = basesrc->pending_seek; - basesrc->pending_seek = NULL; - GST_OBJECT_UNLOCK (basesrc); - - /* no need to unlock anything, the task is certainly - * not running here. The perform seek code will start the task when - * finished. */ - if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE))) - goto seek_failed; - - if (event) - gst_event_unref (event); } else { GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode"); - /* flush all */ - gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); - /* stop the task */ - gst_pad_stop_task (pad); /* now we can stop the source */ if (G_UNLIKELY (!gst_base_src_stop (basesrc))) goto error_stop; @@ -3115,19 +3311,6 @@ error_start: GST_WARNING_OBJECT (basesrc, "Failed to start in push mode"); return FALSE; } -seek_failed: - { - GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek"); - /* flush all */ - gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); - /* stop the task */ - gst_pad_stop_task (pad); - /* Stop the basesrc */ - gst_base_src_stop (basesrc); - if (event) - gst_event_unref (event); - return FALSE; - } error_stop: { GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode"); @@ -3147,19 +3330,8 @@ gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active) GST_DEBUG_OBJECT (basesrc, "Activating in pull mode"); if (G_UNLIKELY (!gst_base_src_start (basesrc))) goto error_start; - - /* if not random_access, we cannot operate in pull mode for now */ - if (G_UNLIKELY (!gst_base_src_is_random_access (basesrc))) - goto no_get_range; - - /* stop flushing now but for live sources, still block in the LIVE lock when - * we are not yet PLAYING */ - gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL); } else { GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode"); - /* flush all, there is no task to stop */ - gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL); - if (G_UNLIKELY (!gst_base_src_stop (basesrc))) goto error_stop; } @@ -3171,12 +3343,6 @@ error_start: GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode"); return FALSE; } -no_get_range: - { - GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping"); - gst_base_src_stop (basesrc); - return FALSE; - } error_stop: { GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode"); @@ -3248,13 +3414,10 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_PAUSED_TO_READY: { - GstEvent **event_p; - /* we don't need to unblock anything here, the pad deactivation code * already did this */ g_atomic_int_set (&basesrc->priv->pending_eos, FALSE); - event_p = &basesrc->pending_seek; - gst_event_replace (event_p, NULL); + gst_event_replace (&basesrc->pending_seek, NULL); break; } case GST_STATE_CHANGE_READY_TO_NULL: diff --git a/libs/gst/base/gstbasesrc.h b/libs/gst/base/gstbasesrc.h index 0624096f21..59568669b6 100644 --- a/libs/gst/base/gstbasesrc.h +++ b/libs/gst/base/gstbasesrc.h @@ -38,17 +38,22 @@ G_BEGIN_DECLS /** * GstBaseSrcFlags: - * @GST_BASE_SRC_STARTED: has source been started + * @GST_BASE_SRC_FLAG_STARTING: has source is starting + * @GST_BASE_SRC_FLAG_STARTED: has source been started * @GST_BASE_SRC_FLAG_LAST: offset to define more flags * * The #GstElement flags that a basesrc element may have. */ typedef enum { - GST_BASE_SRC_STARTED = (GST_ELEMENT_FLAG_LAST << 0), + GST_BASE_SRC_FLAG_STARTING = (GST_ELEMENT_FLAG_LAST << 0), + GST_BASE_SRC_FLAG_STARTED = (GST_ELEMENT_FLAG_LAST << 1), /* padding */ - GST_BASE_SRC_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) + GST_BASE_SRC_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 16) } GstBaseSrcFlags; +#define GST_BASE_SRC_IS_STARTING(obj) GST_OBJECT_FLAG_IS_SET ((obj), GST_BASE_SRC_FLAG_STARTING) +#define GST_BASE_SRC_IS_STARTED(obj) GST_OBJECT_FLAG_IS_SET ((obj), GST_BASE_SRC_FLAG_STARTED) + typedef struct _GstBaseSrc GstBaseSrc; typedef struct _GstBaseSrcClass GstBaseSrcClass; typedef struct _GstBaseSrcPrivate GstBaseSrcPrivate; @@ -115,7 +120,9 @@ struct _GstBaseSrc { * @set_caps: Notify subclass of changed output caps * @decide_allocation: configure the allocation query * @start: Start processing. Subclasses should open resources and prepare - * to produce data. + * to produce data. Implementation should call gst_base_src_start_complete() + * when the operation completes, either from the current thread or any other + * thread that finishes the start operation asynchronously. * @stop: Stop processing. Subclasses should use this to close resources. * @get_times: Given a buffer, return the start and stop time when it * should be pushed out. The base class will sync on the clock using @@ -233,6 +240,12 @@ void gst_base_src_set_format (GstBaseSrc *src, GstFormat format void gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic); +void gst_base_src_set_async (GstBaseSrc *src, gboolean async); +gboolean gst_base_src_is_async (GstBaseSrc *src); + +void gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret); +GstFlowReturn gst_base_src_start_wait (GstBaseSrc * basesrc); + gboolean gst_base_src_query_latency (GstBaseSrc *src, gboolean * live, GstClockTime * min_latency, GstClockTime * max_latency); diff --git a/plugins/elements/gstfakesrc.c b/plugins/elements/gstfakesrc.c index df46d18cf2..9a367a0810 100644 --- a/plugins/elements/gstfakesrc.c +++ b/plugins/elements/gstfakesrc.c @@ -526,11 +526,13 @@ gst_fake_src_set_property (GObject * object, guint prop_id, src->dump = g_value_get_boolean (value); break; case PROP_CAN_ACTIVATE_PUSH: - g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object, GST_BASE_SRC_STARTED)); + g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object, + GST_BASE_SRC_FLAG_STARTED)); GST_BASE_SRC (src)->can_activate_push = g_value_get_boolean (value); break; case PROP_CAN_ACTIVATE_PULL: - g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object, GST_BASE_SRC_STARTED)); + g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object, + GST_BASE_SRC_FLAG_STARTED)); src->can_activate_pull = g_value_get_boolean (value); break; case PROP_IS_LIVE: