basesrc: add async start option

Add a method to enable async start behaviour. The subclass can then complete the
start operation from any other thread by caling gst_base_src_start_complete().
The base class can wait for the start to complete with
gst_base_src_start_wait().
This commit is contained in:
Wim Taymans 2011-12-06 14:01:50 +01:00
parent 960564831e
commit ebc25e895f
3 changed files with 270 additions and 92 deletions

View file

@ -213,6 +213,9 @@ struct _GstBaseSrcPrivate
gboolean discont;
gboolean flushing;
GstFlowReturn start_result;
gboolean async;
/* if segment should be sent */
gboolean segment_pending;
@ -317,6 +320,7 @@ static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
static gboolean gst_base_src_start (GstBaseSrc * basesrc);
static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
@ -435,7 +439,9 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
g_atomic_int_set (&basesrc->priv->have_events, FALSE);
GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
basesrc->priv->start_result = GST_FLOW_WRONG_STATE;
GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
GST_DEBUG_OBJECT (basesrc, "init done");
@ -597,6 +603,49 @@ gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
g_atomic_int_set (&src->priv->dynamic_size, dynamic);
}
/**
* gst_base_src_set_async:
* @src: base source instance
* @async: new async mode
*
* Configure async behaviour in @src, no state change will block. The open,
* close, start, stop, play and pause virtual methods will be executed in a
* different thread and are thus allowed to perform blocking operations. Any
* blocking operation should be unblocked with the unlock vmethod.
*/
void
gst_base_src_set_async (GstBaseSrc * src, gboolean async)
{
g_return_if_fail (GST_IS_BASE_SRC (src));
GST_OBJECT_LOCK (src);
src->priv->async = async;
GST_OBJECT_UNLOCK (src);
}
/**
* gst_base_src_is_async:
* @src: base source instance
*
* Get the current async behaviour of @src. See also gst_base_src_set_async().
*
* Returns: %TRUE if @src is operating in async mode.
*/
gboolean
gst_base_src_is_async (GstBaseSrc * src)
{
gboolean res;
g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
GST_OBJECT_LOCK (src);
res = src->priv->async;
GST_OBJECT_UNLOCK (src);
return res;
}
/**
* gst_base_src_query_latency:
* @src: the source
@ -2179,7 +2228,7 @@ again:
}
}
if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)))
goto not_started;
if (G_UNLIKELY (!bclass->create))
@ -2364,13 +2413,23 @@ static gboolean
gst_base_src_is_random_access (GstBaseSrc * src)
{
/* we need to start the basesrc to check random access */
if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
if (!GST_BASE_SRC_IS_STARTED (src)) {
GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
if (G_LIKELY (gst_base_src_start (src)))
if (G_LIKELY (gst_base_src_start (src))) {
if (gst_base_src_start_wait (src) != GST_FLOW_OK)
goto start_failed;
gst_base_src_stop (src);
}
}
return src->random_access;
/* ERRORS */
start_failed:
{
GST_DEBUG_OBJECT (src, "failed to start");
return FALSE;
}
}
static void
@ -2840,24 +2899,23 @@ static gboolean
gst_base_src_start (GstBaseSrc * basesrc)
{
GstBaseSrcClass *bclass;
gboolean result, have_size;
guint64 size;
gboolean seekable;
GstFormat format;
gboolean result;
if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
return TRUE;
GST_DEBUG_OBJECT (basesrc, "starting source");
GST_LIVE_LOCK (basesrc);
if (GST_BASE_SRC_IS_STARTING (basesrc))
goto was_starting;
if (GST_BASE_SRC_IS_STARTED (basesrc))
goto was_started;
basesrc->priv->start_result = GST_FLOW_WRONG_STATE;
GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
basesrc->num_buffers_left = basesrc->num_buffers;
basesrc->running = FALSE;
basesrc->priv->segment_pending = FALSE;
GST_OBJECT_LOCK (basesrc);
gst_segment_init (&basesrc->segment, basesrc->segment.format);
GST_OBJECT_UNLOCK (basesrc);
basesrc->running = FALSE;
basesrc->priv->segment_pending = FALSE;
GST_LIVE_UNLOCK (basesrc);
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
if (bclass->start)
@ -2868,14 +2926,68 @@ gst_base_src_start (GstBaseSrc * basesrc)
if (!result)
goto could_not_start;
GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
if (!gst_base_src_is_async (basesrc))
gst_base_src_start_complete (basesrc, GST_FLOW_OK);
return result;
/* ERROR */
was_starting:
{
GST_DEBUG_OBJECT (basesrc, "was starting");
GST_LIVE_UNLOCK (basesrc);
return TRUE;
}
was_started:
{
GST_DEBUG_OBJECT (basesrc, "was started");
GST_LIVE_UNLOCK (basesrc);
return TRUE;
}
could_not_start:
{
GST_DEBUG_OBJECT (basesrc, "could not start");
/* subclass is supposed to post a message. We don't have to call _stop. */
gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
return FALSE;
}
}
/**
* gst_base_src_start_complete:
* @src: base source instance
* @ret: a #GstFlowReturn
*
* Complete an asynchronous start operation. When the subclass overrides the
* start method, it should call gst_base_src_start_complete() when the start
* operation completes either from the same thread or from an asynchronous
* helper thread.
*/
void
gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
{
gboolean have_size;
guint64 size;
gboolean seekable;
GstFormat format;
GstPadMode mode;
GstEvent *event;
GstBaseSrcClass *bclass;
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
if (ret != GST_FLOW_OK)
goto error;
GST_DEBUG_OBJECT (basesrc, "starting source");
format = basesrc->segment.format;
/* figure out the size */
have_size = FALSE;
size = -1;
if (format == GST_FORMAT_BYTES) {
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
if (bclass->get_size) {
if (!(have_size = bclass->get_size (basesrc, &size)))
size = -1;
@ -2901,16 +3013,107 @@ gst_base_src_start (GstBaseSrc * basesrc)
GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
/* stop flushing now but for live sources, still block in the LIVE lock when
* we are not yet PLAYING */
gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
GST_OBJECT_LOCK (basesrc->srcpad);
mode = GST_PAD_MODE (basesrc->srcpad);
GST_OBJECT_UNLOCK (basesrc->srcpad);
if (mode == GST_PAD_MODE_PUSH) {
/* do initial seek, which will start the task */
GST_OBJECT_LOCK (basesrc);
event = basesrc->pending_seek;
basesrc->pending_seek = NULL;
GST_OBJECT_UNLOCK (basesrc);
/* no need to unlock anything, the task is certainly
* not running here. The perform seek code will start the task when
* finished. */
if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
goto seek_failed;
if (event)
gst_event_unref (event);
} else {
/* if not random_access, we cannot operate in pull mode for now */
if (G_UNLIKELY (!basesrc->random_access))
goto no_get_range;
}
gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
return TRUE;
GST_LIVE_LOCK (basesrc);
GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
basesrc->priv->start_result = ret;
GST_LIVE_SIGNAL (basesrc);
GST_LIVE_UNLOCK (basesrc);
/* ERROR */
could_not_start:
return;
seek_failed:
{
GST_DEBUG_OBJECT (basesrc, "could not start");
/* subclass is supposed to post a message. We don't have to call _stop. */
return FALSE;
GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
if (event)
gst_event_unref (event);
ret = GST_FLOW_ERROR;
goto error;
}
no_get_range:
{
gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
ret = GST_FLOW_ERROR;
goto error;
}
error:
{
GST_LIVE_LOCK (basesrc);
basesrc->priv->start_result = ret;
GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
GST_LIVE_SIGNAL (basesrc);
GST_LIVE_UNLOCK (basesrc);
return;
}
}
/**
* gst_base_src_start_complete:
* @src: base source instance
* @ret: a #GstFlowReturn
*
* Wait until the start operation completes.
*
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
gst_base_src_start_wait (GstBaseSrc * basesrc)
{
GstFlowReturn result;
GST_LIVE_LOCK (basesrc);
if (G_UNLIKELY (basesrc->priv->flushing))
goto flushing;
while (GST_BASE_SRC_IS_STARTING (basesrc)) {
GST_LIVE_WAIT (basesrc);
if (G_UNLIKELY (basesrc->priv->flushing))
goto flushing;
}
result = basesrc->priv->start_result;
GST_LIVE_UNLOCK (basesrc);
return result;
/* ERRORS */
flushing:
{
GST_DEBUG_OBJECT (basesrc, "we are flushing");
GST_LIVE_UNLOCK (basesrc);
return GST_FLOW_WRONG_STATE;
}
}
@ -2920,21 +3123,37 @@ gst_base_src_stop (GstBaseSrc * basesrc)
GstBaseSrcClass *bclass;
gboolean result = TRUE;
if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
return TRUE;
GST_DEBUG_OBJECT (basesrc, "stopping source");
/* flush all */
gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
/* stop the task */
gst_pad_stop_task (basesrc->srcpad);
GST_LIVE_LOCK (basesrc);
if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
goto was_stopped;
GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
basesrc->priv->start_result = GST_FLOW_WRONG_STATE;
GST_LIVE_SIGNAL (basesrc);
GST_LIVE_UNLOCK (basesrc);
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
if (bclass->stop)
result = bclass->stop (basesrc);
gst_base_src_set_allocation (basesrc, NULL, NULL, 0, 0);
if (result)
GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
return result;
was_stopped:
{
GST_DEBUG_OBJECT (basesrc, "was started");
GST_LIVE_UNLOCK (basesrc);
return TRUE;
}
}
/* start or stop flushing dataprocessing
@ -3061,7 +3280,6 @@ static gboolean
gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
{
GstBaseSrc *basesrc;
GstEvent *event;
basesrc = GST_BASE_SRC (parent);
@ -3074,30 +3292,8 @@ gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
if (G_UNLIKELY (!gst_base_src_start (basesrc)))
goto error_start;
basesrc->priv->discont = TRUE;
gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
/* do initial seek, which will start the task */
GST_OBJECT_LOCK (basesrc);
event = basesrc->pending_seek;
basesrc->pending_seek = NULL;
GST_OBJECT_UNLOCK (basesrc);
/* no need to unlock anything, the task is certainly
* not running here. The perform seek code will start the task when
* finished. */
if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
goto seek_failed;
if (event)
gst_event_unref (event);
} else {
GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
/* flush all */
gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
/* stop the task */
gst_pad_stop_task (pad);
/* now we can stop the source */
if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
goto error_stop;
@ -3115,19 +3311,6 @@ error_start:
GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
return FALSE;
}
seek_failed:
{
GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
/* flush all */
gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
/* stop the task */
gst_pad_stop_task (pad);
/* Stop the basesrc */
gst_base_src_stop (basesrc);
if (event)
gst_event_unref (event);
return FALSE;
}
error_stop:
{
GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
@ -3147,19 +3330,8 @@ gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
if (G_UNLIKELY (!gst_base_src_start (basesrc)))
goto error_start;
/* if not random_access, we cannot operate in pull mode for now */
if (G_UNLIKELY (!gst_base_src_is_random_access (basesrc)))
goto no_get_range;
/* stop flushing now but for live sources, still block in the LIVE lock when
* we are not yet PLAYING */
gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
} else {
GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
/* flush all, there is no task to stop */
gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
goto error_stop;
}
@ -3171,12 +3343,6 @@ error_start:
GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
return FALSE;
}
no_get_range:
{
GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
gst_base_src_stop (basesrc);
return FALSE;
}
error_stop:
{
GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
@ -3248,13 +3414,10 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
{
GstEvent **event_p;
/* we don't need to unblock anything here, the pad deactivation code
* already did this */
g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
event_p = &basesrc->pending_seek;
gst_event_replace (event_p, NULL);
gst_event_replace (&basesrc->pending_seek, NULL);
break;
}
case GST_STATE_CHANGE_READY_TO_NULL:

View file

@ -38,17 +38,22 @@ G_BEGIN_DECLS
/**
* GstBaseSrcFlags:
* @GST_BASE_SRC_STARTED: has source been started
* @GST_BASE_SRC_FLAG_STARTING: has source is starting
* @GST_BASE_SRC_FLAG_STARTED: has source been started
* @GST_BASE_SRC_FLAG_LAST: offset to define more flags
*
* The #GstElement flags that a basesrc element may have.
*/
typedef enum {
GST_BASE_SRC_STARTED = (GST_ELEMENT_FLAG_LAST << 0),
GST_BASE_SRC_FLAG_STARTING = (GST_ELEMENT_FLAG_LAST << 0),
GST_BASE_SRC_FLAG_STARTED = (GST_ELEMENT_FLAG_LAST << 1),
/* padding */
GST_BASE_SRC_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2)
GST_BASE_SRC_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 16)
} GstBaseSrcFlags;
#define GST_BASE_SRC_IS_STARTING(obj) GST_OBJECT_FLAG_IS_SET ((obj), GST_BASE_SRC_FLAG_STARTING)
#define GST_BASE_SRC_IS_STARTED(obj) GST_OBJECT_FLAG_IS_SET ((obj), GST_BASE_SRC_FLAG_STARTED)
typedef struct _GstBaseSrc GstBaseSrc;
typedef struct _GstBaseSrcClass GstBaseSrcClass;
typedef struct _GstBaseSrcPrivate GstBaseSrcPrivate;
@ -115,7 +120,9 @@ struct _GstBaseSrc {
* @set_caps: Notify subclass of changed output caps
* @decide_allocation: configure the allocation query
* @start: Start processing. Subclasses should open resources and prepare
* to produce data.
* to produce data. Implementation should call gst_base_src_start_complete()
* when the operation completes, either from the current thread or any other
* thread that finishes the start operation asynchronously.
* @stop: Stop processing. Subclasses should use this to close resources.
* @get_times: Given a buffer, return the start and stop time when it
* should be pushed out. The base class will sync on the clock using
@ -233,6 +240,12 @@ void gst_base_src_set_format (GstBaseSrc *src, GstFormat format
void gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic);
void gst_base_src_set_async (GstBaseSrc *src, gboolean async);
gboolean gst_base_src_is_async (GstBaseSrc *src);
void gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret);
GstFlowReturn gst_base_src_start_wait (GstBaseSrc * basesrc);
gboolean gst_base_src_query_latency (GstBaseSrc *src, gboolean * live,
GstClockTime * min_latency,
GstClockTime * max_latency);

View file

@ -526,11 +526,13 @@ gst_fake_src_set_property (GObject * object, guint prop_id,
src->dump = g_value_get_boolean (value);
break;
case PROP_CAN_ACTIVATE_PUSH:
g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object, GST_BASE_SRC_STARTED));
g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object,
GST_BASE_SRC_FLAG_STARTED));
GST_BASE_SRC (src)->can_activate_push = g_value_get_boolean (value);
break;
case PROP_CAN_ACTIVATE_PULL:
g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object, GST_BASE_SRC_STARTED));
g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object,
GST_BASE_SRC_FLAG_STARTED));
src->can_activate_pull = g_value_get_boolean (value);
break;
case PROP_IS_LIVE: