app: add callbacks to appsrc, cleanups

Add a uri handler to appsink.
don't emit signals when we have installed callbacks on appsink.

Add callbacks to appsrc to replace the signals.
Add property to disable callbacks in appsrc, default to TRUE for backwards
compatibility but disable when callbacks are installed.

API: GstAppSrc::emit-signals
API: GstAppSrc::gst_app_src_set_emit_signals()
API: GstAppSrc::gst_app_src_get_emit_signals()
API: GstAppSrc::gst_app_src_set_callbacks()
This commit is contained in:
Wim Taymans 2009-02-26 16:44:53 +01:00
parent 661f2da6e0
commit c4036dd701
5 changed files with 279 additions and 17 deletions

View file

@ -15,6 +15,10 @@ gst_app_src_set_stream_type
gst_app_src_get_stream_type gst_app_src_get_stream_type
gst_app_src_set_max_bytes gst_app_src_set_max_bytes
gst_app_src_get_max_bytes gst_app_src_get_max_bytes
gst_app_src_get_emit_signals
gst_app_src_set_emit_signals
GstAppSrcCallbacks
gst_app_src_set_callbacks
gst_app_src_push_buffer gst_app_src_push_buffer
gst_app_src_end_of_stream gst_app_src_end_of_stream
<SUBSECTION Standard> <SUBSECTION Standard>

View file

@ -148,6 +148,9 @@ GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_ALWAYS, GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY); GST_STATIC_CAPS_ANY);
static void gst_app_sink_uri_handler_init (gpointer g_iface,
gpointer iface_data);
static void gst_app_sink_dispose (GObject * object); static void gst_app_sink_dispose (GObject * object);
static void gst_app_sink_finalize (GObject * object); static void gst_app_sink_finalize (GObject * object);
@ -169,7 +172,20 @@ static GstCaps *gst_app_sink_getcaps (GstBaseSink * psink);
static guint gst_app_sink_signals[LAST_SIGNAL] = { 0 }; static guint gst_app_sink_signals[LAST_SIGNAL] = { 0 };
GST_BOILERPLATE (GstAppSink, gst_app_sink, GstBaseSink, GST_TYPE_BASE_SINK); static void
_do_init (GType filesrc_type)
{
static const GInterfaceInfo urihandler_info = {
gst_app_sink_uri_handler_init,
NULL,
NULL
};
g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER,
&urihandler_info);
}
GST_BOILERPLATE_FULL (GstAppSink, gst_app_sink, GstBaseSink, GST_TYPE_BASE_SINK,
_do_init);
/* Can't use glib-genmarshal for this, as it doesn't know how to handle /* Can't use glib-genmarshal for this, as it doesn't know how to handle
* GstMiniObject-based types, which are a new fundamental type */ * GstMiniObject-based types, which are a new fundamental type */
@ -582,10 +598,11 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
g_mutex_unlock (appsink->priv->mutex); g_mutex_unlock (appsink->priv->mutex);
/* emit EOS now */ /* emit EOS now */
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_EOS], 0);
if (appsink->priv->callbacks.eos) if (appsink->priv->callbacks.eos)
appsink->priv->callbacks.eos (appsink, appsink->priv->user_data); appsink->priv->callbacks.eos (appsink, appsink->priv->user_data);
else
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_EOS], 0);
break; break;
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
/* we don't have to do anything here, the base class will call unlock /* we don't have to do anything here, the base class will call unlock
@ -620,11 +637,10 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
emit = appsink->priv->emit_signals; emit = appsink->priv->emit_signals;
g_mutex_unlock (appsink->priv->mutex); g_mutex_unlock (appsink->priv->mutex);
if (emit)
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_PREROLL], 0);
if (appsink->priv->callbacks.new_preroll) if (appsink->priv->callbacks.new_preroll)
appsink->priv->callbacks.new_preroll (appsink, appsink->priv->user_data); appsink->priv->callbacks.new_preroll (appsink, appsink->priv->user_data);
else if (emit)
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_PREROLL], 0);
return GST_FLOW_OK; return GST_FLOW_OK;
@ -673,11 +689,10 @@ gst_app_sink_render (GstBaseSink * psink, GstBuffer * buffer)
emit = appsink->priv->emit_signals; emit = appsink->priv->emit_signals;
g_mutex_unlock (appsink->priv->mutex); g_mutex_unlock (appsink->priv->mutex);
if (emit)
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_BUFFER], 0);
if (appsink->priv->callbacks.new_buffer) if (appsink->priv->callbacks.new_buffer)
appsink->priv->callbacks.new_buffer (appsink, appsink->priv->user_data); appsink->priv->callbacks.new_buffer (appsink, appsink->priv->user_data);
else if (emit)
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_BUFFER], 0);
return GST_FLOW_OK; return GST_FLOW_OK;
@ -1106,6 +1121,9 @@ not_started:
* This is an alternative to using the signals, it has lower overhead and is thus * This is an alternative to using the signals, it has lower overhead and is thus
* less expensive, but also less flexible. * less expensive, but also less flexible.
* *
* If callbacks are installed, no signals will be emited for performance
* reasons.
*
* Since: 0.10.23 * Since: 0.10.23
*/ */
void void
@ -1139,3 +1157,49 @@ gst_app_sink_set_callbacks (GstAppSink * appsink,
appsink->priv->notify = notify; appsink->priv->notify = notify;
GST_OBJECT_UNLOCK (appsink); GST_OBJECT_UNLOCK (appsink);
} }
/*** GSTURIHANDLER INTERFACE *************************************************/
static GstURIType
gst_app_sink_uri_get_type (void)
{
return GST_URI_SINK;
}
static gchar **
gst_app_sink_uri_get_protocols (void)
{
static gchar *protocols[] = { "appsink", NULL };
return protocols;
}
static const gchar *
gst_app_sink_uri_get_uri (GstURIHandler * handler)
{
return "appsink";
}
static gboolean
gst_app_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri)
{
gchar *protocol;
gboolean ret;
protocol = gst_uri_get_protocol (uri);
ret = !strcmp (protocol, "appsink");
g_free (protocol);
return ret;
}
static void
gst_app_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
iface->get_type = gst_app_sink_uri_get_type;
iface->get_protocols = gst_app_sink_uri_get_protocols;
iface->get_uri = gst_app_sink_uri_get_uri;
iface->set_uri = gst_app_sink_uri_set_uri;
}

View file

@ -136,6 +136,11 @@ struct _GstAppSrcPrivate
guint64 min_latency; guint64 min_latency;
guint64 max_latency; guint64 max_latency;
gboolean emit_signals;
GstAppSrcCallbacks callbacks;
gpointer user_data;
GDestroyNotify notify;
}; };
GST_DEBUG_CATEGORY_STATIC (app_src_debug); GST_DEBUG_CATEGORY_STATIC (app_src_debug);
@ -163,6 +168,7 @@ enum
#define DEFAULT_PROP_IS_LIVE FALSE #define DEFAULT_PROP_IS_LIVE FALSE
#define DEFAULT_PROP_MIN_LATENCY -1 #define DEFAULT_PROP_MIN_LATENCY -1
#define DEFAULT_PROP_MAX_LATENCY -1 #define DEFAULT_PROP_MAX_LATENCY -1
#define DEFAULT_PROP_EMIT_SIGNALS TRUE
enum enum
{ {
@ -176,6 +182,7 @@ enum
PROP_IS_LIVE, PROP_IS_LIVE,
PROP_MIN_LATENCY, PROP_MIN_LATENCY,
PROP_MAX_LATENCY, PROP_MAX_LATENCY,
PROP_EMIT_SIGNALS,
PROP_LAST PROP_LAST
}; };
@ -376,6 +383,20 @@ gst_app_src_class_init (GstAppSrcClass * klass)
-1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY, -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::emit-signals
*
* Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
* This option is by default enabled for backwards compatibility reasons but
* can disabled when needed because signal emission is expensive.
*
* Since: 0.10.23
*/
g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
g_param_spec_boolean ("emit-signals", "Emit signals",
"Emit new-preroll and new-buffer signals", DEFAULT_PROP_EMIT_SIGNALS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/** /**
* GstAppSrc::need-data: * GstAppSrc::need-data:
* @appsrc: the appsrc element that emited the signal * @appsrc: the appsrc element that emited the signal
@ -490,6 +511,7 @@ gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
appsrc->priv->block = DEFAULT_PROP_BLOCK; appsrc->priv->block = DEFAULT_PROP_BLOCK;
appsrc->priv->min_latency = DEFAULT_PROP_MIN_LATENCY; appsrc->priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
appsrc->priv->max_latency = DEFAULT_PROP_MAX_LATENCY; appsrc->priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
appsrc->priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE); gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
} }
@ -567,6 +589,9 @@ gst_app_src_set_property (GObject * object, guint prop_id,
gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE, gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
g_value_get_int64 (value)); g_value_get_int64 (value));
break; break;
case PROP_EMIT_SIGNALS:
gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -625,6 +650,9 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
g_value_set_int64 (value, max); g_value_set_int64 (value, max);
break; break;
} }
case PROP_EMIT_SIGNALS:
g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -789,8 +817,20 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
if (appsrc->priv->stream_type == GST_APP_STREAM_TYPE_STREAM) if (appsrc->priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
return TRUE; return TRUE;
if (appsrc->priv->callbacks.seek_data)
res = appsrc->priv->callbacks.seek_data (appsrc, desired_position,
appsrc->priv->user_data);
else {
gboolean emit;
g_mutex_lock (appsrc->priv->mutex);
emit = appsrc->priv->emit_signals;
g_mutex_unlock (appsrc->priv->mutex);
if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0, g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
desired_position, &res); desired_position, &res);
}
if (res) { if (res) {
GST_DEBUG_OBJECT (appsrc, "flushing queue"); GST_DEBUG_OBJECT (appsrc, "flushing queue");
@ -819,13 +859,19 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
* changed. */ * changed. */
if (G_UNLIKELY (appsrc->priv->offset != offset)) { if (G_UNLIKELY (appsrc->priv->offset != offset)) {
gboolean res; gboolean res;
gboolean emit;
emit = appsrc->priv->emit_signals;
g_mutex_unlock (appsrc->priv->mutex); g_mutex_unlock (appsrc->priv->mutex);
GST_DEBUG_OBJECT (appsrc, GST_DEBUG_OBJECT (appsrc,
"we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT, "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
appsrc->priv->offset, offset); appsrc->priv->offset, offset);
if (appsrc->priv->callbacks.seek_data)
res = appsrc->priv->callbacks.seek_data (appsrc, offset,
appsrc->priv->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0, g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
offset, &res); offset, &res);
@ -864,9 +910,16 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
ret = GST_FLOW_OK; ret = GST_FLOW_OK;
break; break;
} else { } else {
gboolean emit;
emit = appsrc->priv->emit_signals;
g_mutex_unlock (appsrc->priv->mutex); g_mutex_unlock (appsrc->priv->mutex);
/* we have no data, we need some. We fire the signal with the size hint. */ /* we have no data, we need some. We fire the signal with the size hint. */
if (appsrc->priv->callbacks.need_data)
appsrc->priv->callbacks.need_data (appsrc, size,
appsrc->priv->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size, g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
NULL); NULL);
@ -1194,6 +1247,52 @@ gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
g_mutex_unlock (appsrc->priv->mutex); g_mutex_unlock (appsrc->priv->mutex);
} }
/**
* gst_app_src_set_emit_signals:
* @appsrc: a #GstAppSrc
* @emit: the new state
*
* Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
* by default disabled because signal emission is expensive and unneeded when
* the application prefers to operate in pull mode.
*
* Since: 0.10.23
*/
void
gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
{
g_return_if_fail (GST_IS_APP_SRC (appsrc));
g_mutex_lock (appsrc->priv->mutex);
appsrc->priv->emit_signals = emit;
g_mutex_unlock (appsrc->priv->mutex);
}
/**
* gst_app_src_get_emit_signals:
* @appsrc: a #GstAppSrc
*
* Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
*
* Returns: %TRUE if @appsrc is emiting the "new-preroll" and "new-buffer"
* signals.
*
* Since: 0.10.23
*/
gboolean
gst_app_src_get_emit_signals (GstAppSrc * appsrc)
{
gboolean result;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
g_mutex_lock (appsrc->priv->mutex);
result = appsrc->priv->emit_signals;
g_mutex_unlock (appsrc->priv->mutex);
return result;
}
static GstFlowReturn static GstFlowReturn
gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer, gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
gboolean steal_ref) gboolean steal_ref)
@ -1221,9 +1320,15 @@ gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
appsrc->priv->queued_bytes, appsrc->priv->max_bytes); appsrc->priv->queued_bytes, appsrc->priv->max_bytes);
if (first) { if (first) {
gboolean emit;
emit = appsrc->priv->emit_signals;
/* only signal on the first push */ /* only signal on the first push */
g_mutex_unlock (appsrc->priv->mutex); g_mutex_unlock (appsrc->priv->mutex);
if (appsrc->priv->callbacks.enough_data)
appsrc->priv->callbacks.enough_data (appsrc, appsrc->priv->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0, g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
NULL); NULL);
@ -1284,6 +1389,9 @@ eos:
* Adds a buffer to the queue of buffers that the appsrc element will * Adds a buffer to the queue of buffers that the appsrc element will
* push to its source pad. This function takes ownership of the buffer. * push to its source pad. This function takes ownership of the buffer.
* *
* When the block property is TRUE, this function can block until free
* space becomes available in the queue.
*
* Returns: #GST_FLOW_OK when the buffer was successfuly queued. * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
* #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING. * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
* #GST_FLOW_UNEXPECTED when EOS occured. * #GST_FLOW_UNEXPECTED when EOS occured.
@ -1343,6 +1451,55 @@ flushing:
} }
} }
/**
* gst_app_src_set_callbacks:
* @appsrc: a #GstAppSrc
* @callbacks: the callbacks
* @user_data: a user_data argument for the callbacks
* @notify: a destroy notify function
*
* Set callbacks which will be executed when data is needed, enough data has
* been collected or when a seek should be performed.
* This is an alternative to using the signals, it has lower overhead and is thus
* less expensive, but also less flexible.
*
* If callbacks are installed, no signals will be emited for performance
* reasons.
*
* Since: 0.10.23
*/
void
gst_app_src_set_callbacks (GstAppSrc * appsrc,
GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
{
GDestroyNotify old_notify;
g_return_if_fail (appsrc != NULL);
g_return_if_fail (GST_IS_APP_SRC (appsrc));
g_return_if_fail (callbacks != NULL);
GST_OBJECT_LOCK (appsrc);
old_notify = appsrc->priv->notify;
if (old_notify) {
gpointer old_data;
old_data = appsrc->priv->user_data;
appsrc->priv->user_data = NULL;
appsrc->priv->notify = NULL;
GST_OBJECT_UNLOCK (appsrc);
old_notify (old_data);
GST_OBJECT_LOCK (appsrc);
}
appsrc->priv->callbacks = *callbacks;
appsrc->priv->user_data = user_data;
appsrc->priv->notify = notify;
GST_OBJECT_UNLOCK (appsrc);
}
/*** GSTURIHANDLER INTERFACE *************************************************/ /*** GSTURIHANDLER INTERFACE *************************************************/
static GstURIType static GstURIType

View file

@ -40,6 +40,32 @@ typedef struct _GstAppSrc GstAppSrc;
typedef struct _GstAppSrcClass GstAppSrcClass; typedef struct _GstAppSrcClass GstAppSrcClass;
typedef struct _GstAppSrcPrivate GstAppSrcPrivate; typedef struct _GstAppSrcPrivate GstAppSrcPrivate;
/**
* GstAppSrcCallbacks:
* @need_data: Called when the appsrc needs more data. A buffer or EOS should be
* pushed to appsrc from this thread or another thread. @length is just a hint
* and when it is set to -1, any number of bytes can be pushed into @appsrc.
* @enough_data: Called when appsrc has enough data. It is recommended that the
* application stops calling push-buffer until the need_data callback is
* emited again to avoid excessive buffer queueing.
* @seek_data: Called when a seek should be performed to the offset.
* The next push-buffer should produce buffers from the new @offset.
* This callback is only called for seekable stream types.
*
* A set of callbacks that can be installed on the appsrc with
* gst_app_src_set_callbacks().
*
* Since: 0.10.23
*/
typedef struct {
void (*need_data) (GstAppSrc *src, guint length, gpointer user_data);
void (*enough_data) (GstAppSrc *src, gpointer user_data);
gboolean (*seek_data) (GstAppSrc *src, guint64 offset, gpointer user_data);
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
} GstAppSrcCallbacks;
/** /**
* GstAppStreamType: * GstAppStreamType:
* @GST_APP_STREAM_TYPE_STREAM: No seeking is supported in the stream, such as a * @GST_APP_STREAM_TYPE_STREAM: No seeking is supported in the stream, such as a
@ -103,9 +129,17 @@ guint64 gst_app_src_get_max_bytes (GstAppSrc *appsrc);
void gst_app_src_set_latency (GstAppSrc *appsrc, guint64 min, guint64 max); void gst_app_src_set_latency (GstAppSrc *appsrc, guint64 min, guint64 max);
void gst_app_src_get_latency (GstAppSrc *appsrc, guint64 *min, guint64 *max); void gst_app_src_get_latency (GstAppSrc *appsrc, guint64 *min, guint64 *max);
void gst_app_src_set_emit_signals (GstAppSrc *appsrc, gboolean emit);
gboolean gst_app_src_get_emit_signals (GstAppSrc *appsrc);
GstFlowReturn gst_app_src_push_buffer (GstAppSrc *appsrc, GstBuffer *buffer); GstFlowReturn gst_app_src_push_buffer (GstAppSrc *appsrc, GstBuffer *buffer);
GstFlowReturn gst_app_src_end_of_stream (GstAppSrc *appsrc); GstFlowReturn gst_app_src_end_of_stream (GstAppSrc *appsrc);
void gst_app_src_set_callbacks (GstAppSrc * appsrc,
GstAppSrcCallbacks *callbacks,
gpointer user_data,
GDestroyNotify notify);
G_END_DECLS G_END_DECLS
#endif #endif

View file

@ -16,13 +16,16 @@ EXPORTS
gst_app_sink_set_max_buffers gst_app_sink_set_max_buffers
gst_app_src_end_of_stream gst_app_src_end_of_stream
gst_app_src_get_caps gst_app_src_get_caps
gst_app_src_get_emit_signals
gst_app_src_get_latency gst_app_src_get_latency
gst_app_src_get_max_bytes gst_app_src_get_max_bytes
gst_app_src_get_size gst_app_src_get_size
gst_app_src_get_stream_type gst_app_src_get_stream_type
gst_app_src_get_type gst_app_src_get_type
gst_app_src_push_buffer gst_app_src_push_buffer
gst_app_src_set_callbacks
gst_app_src_set_caps gst_app_src_set_caps
gst_app_src_set_emit_signals
gst_app_src_set_latency gst_app_src_set_latency
gst_app_src_set_max_bytes gst_app_src_set_max_bytes
gst_app_src_set_size gst_app_src_set_size