gst-libs/gst/app/gstappsink.*: Start some docs.

Original commit message from CVS:
* gst-libs/gst/app/gstappsink.c: (gst_app_sink_class_init),
(gst_app_sink_init), (gst_app_sink_set_property),
(gst_app_sink_get_property), (gst_app_sink_unlock_start),
(gst_app_sink_unlock_stop), (gst_app_sink_flush_unlocked),
(gst_app_sink_start), (gst_app_sink_stop), (gst_app_sink_event),
(gst_app_sink_preroll), (gst_app_sink_render),
(gst_app_sink_set_caps), (gst_app_sink_set_drop),
(gst_app_sink_get_drop):
* gst-libs/gst/app/gstappsink.h:
Start some docs.
Add property to drop buffers when the queue is filled
Fix unlocking and flushing when the queues are filled.
This commit is contained in:
Wim Taymans 2008-05-05 10:27:45 +00:00
parent de277a5b2a
commit 1275eda108
2 changed files with 170 additions and 25 deletions

View file

@ -17,6 +17,21 @@
* Boston, MA 02111-1307, USA. * Boston, MA 02111-1307, USA.
*/ */
/**
* SECTION:element-appsink
* @short_description: hand data to an application
* @see_also: #GstBaseSrc
*
* <refsect2>
* <para>
* Appsink is a sink plugin that supports many different methods for making
* the application get a handle on the GStreamer data in a pipeline.
* </para>
* </refsect2>
*
* Last reviewed on 2008-05-03 (0.10.8)
*/
#ifdef HAVE_CONFIG_H #ifdef HAVE_CONFIG_H
#include "config.h" #include "config.h"
#endif #endif
@ -54,8 +69,9 @@ enum
}; };
#define DEFAULT_PROP_EOS TRUE #define DEFAULT_PROP_EOS TRUE
#define DEFAULT_PROP_EMIT_SIGNALS TRUE #define DEFAULT_PROP_EMIT_SIGNALS FALSE
#define DEFAULT_PROP_MAX_BUFFERS 0 #define DEFAULT_PROP_MAX_BUFFERS 0
#define DEFAULT_PROP_DROP FALSE
enum enum
{ {
@ -64,8 +80,8 @@ enum
PROP_EOS, PROP_EOS,
PROP_EMIT_SIGNALS, PROP_EMIT_SIGNALS,
PROP_MAX_BUFFERS, PROP_MAX_BUFFERS,
PROP_DROP,
PROP_LAST, PROP_LAST
}; };
static GstStaticPadTemplate gst_app_sink_template = static GstStaticPadTemplate gst_app_sink_template =
@ -82,6 +98,8 @@ static void gst_app_sink_set_property (GObject * object, guint prop_id,
static void gst_app_sink_get_property (GObject * object, guint prop_id, static void gst_app_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec); GValue * value, GParamSpec * pspec);
static gboolean gst_app_sink_unlock_start (GstBaseSink * bsink);
static gboolean gst_app_sink_unlock_stop (GstBaseSink * bsink);
static gboolean gst_app_sink_start (GstBaseSink * psink); static gboolean gst_app_sink_start (GstBaseSink * psink);
static gboolean gst_app_sink_stop (GstBaseSink * psink); static gboolean gst_app_sink_stop (GstBaseSink * psink);
static gboolean gst_app_sink_event (GstBaseSink * sink, GstEvent * event); static gboolean gst_app_sink_event (GstBaseSink * sink, GstEvent * event);
@ -154,7 +172,7 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
g_object_class_install_property (gobject_class, PROP_CAPS, g_object_class_install_property (gobject_class, PROP_CAPS,
g_param_spec_boxed ("caps", "Caps", g_param_spec_boxed ("caps", "Caps",
"The caps of the sink pad", GST_TYPE_CAPS, "The allowed caps for the sink pad", GST_TYPE_CAPS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_EOS, g_object_class_install_property (gobject_class, PROP_EOS,
@ -169,10 +187,15 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS, g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
g_param_spec_uint ("max-buffers", "Max Buffers", g_param_spec_uint ("max-buffers", "Max Buffers",
"Control the maximum buffers to queue internally (0 = unlimited)", "The maximum number of buffers to queue internally (0 = unlimited)",
0, G_MAXUINT, DEFAULT_PROP_MAX_BUFFERS, 0, G_MAXUINT, DEFAULT_PROP_MAX_BUFFERS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_DROP,
g_param_spec_boolean ("drop", "Drop",
"Drop old buffers when the buffer queue is filled", DEFAULT_PROP_DROP,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/** /**
* GstAppSink::eos: * GstAppSink::eos:
* @appsink: the appsink element that emited the signal * @appsink: the appsink element that emited the signal
@ -262,9 +285,12 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
* *
* This function will only return buffers when the appsink is in the PLAYING * This function will only return buffers when the appsink is in the PLAYING
* state. All rendered buffers will be put in a queue so that the application * state. All rendered buffers will be put in a queue so that the application
* can pull buffers at its own rate. Note that when the application does not * can pull buffers at its own rate.
* pull buffers fast enough, the queued buffers could consume a lot of memory, *
* especially when dealing with raw video frames. * Note that when the application does not pull buffers fast enough, the
* queued buffers could consume a lot of memory, especially when dealing with
* raw video frames. It's possible to control the behaviour of the queue with
* the "drop" and "max-buffers" properties.
* *
* If an EOS event was received before any buffers, this function returns * If an EOS event was received before any buffers, this function returns
* %NULL. Use gst_app_sink_is_eos () to check for the EOS condition. * %NULL. Use gst_app_sink_is_eos () to check for the EOS condition.
@ -277,6 +303,8 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
pull_buffer), NULL, NULL, gst_app_marshal_OBJECT__VOID, pull_buffer), NULL, NULL, gst_app_marshal_OBJECT__VOID,
GST_TYPE_BUFFER, 0, G_TYPE_NONE); GST_TYPE_BUFFER, 0, G_TYPE_NONE);
basesink_class->unlock = gst_app_sink_unlock_start;
basesink_class->unlock_stop = gst_app_sink_unlock_stop;
basesink_class->start = gst_app_sink_start; basesink_class->start = gst_app_sink_start;
basesink_class->stop = gst_app_sink_stop; basesink_class->stop = gst_app_sink_stop;
basesink_class->event = gst_app_sink_event; basesink_class->event = gst_app_sink_event;
@ -297,6 +325,7 @@ gst_app_sink_init (GstAppSink * appsink, GstAppSinkClass * klass)
appsink->emit_signals = DEFAULT_PROP_EMIT_SIGNALS; appsink->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
appsink->max_buffers = DEFAULT_PROP_MAX_BUFFERS; appsink->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
appsink->drop = DEFAULT_PROP_DROP;
} }
static void static void
@ -349,6 +378,9 @@ gst_app_sink_set_property (GObject * object, guint prop_id,
case PROP_MAX_BUFFERS: case PROP_MAX_BUFFERS:
gst_app_sink_set_max_buffers (appsink, g_value_get_uint (value)); gst_app_sink_set_max_buffers (appsink, g_value_get_uint (value));
break; break;
case PROP_DROP:
gst_app_sink_set_drop (appsink, g_value_get_boolean (value));
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -381,18 +413,49 @@ gst_app_sink_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_MAX_BUFFERS: case PROP_MAX_BUFFERS:
g_value_set_uint (value, gst_app_sink_get_max_buffers (appsink)); g_value_set_uint (value, gst_app_sink_get_max_buffers (appsink));
break; break;
case PROP_DROP:
g_value_set_boolean (value, gst_app_sink_get_drop (appsink));
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
} }
} }
static gboolean
gst_app_sink_unlock_start (GstBaseSink * bsink)
{
GstAppSink *appsink = GST_APP_SINK (bsink);
g_mutex_lock (appsink->mutex);
GST_DEBUG_OBJECT (appsink, "unlock start");
appsink->flushing = TRUE;
g_cond_signal (appsink->cond);
g_mutex_unlock (appsink->mutex);
return TRUE;
}
static gboolean
gst_app_sink_unlock_stop (GstBaseSink * bsink)
{
GstAppSink *appsink = GST_APP_SINK (bsink);
g_mutex_lock (appsink->mutex);
GST_DEBUG_OBJECT (appsink, "unlock stop");
appsink->flushing = FALSE;
g_cond_signal (appsink->cond);
g_mutex_unlock (appsink->mutex);
return TRUE;
}
static void static void
gst_app_sink_flush_unlocked (GstAppSink * appsink) gst_app_sink_flush_unlocked (GstAppSink * appsink)
{ {
GstBuffer *buffer; GstBuffer *buffer;
GST_DEBUG_OBJECT (appsink, "flushing appsink"); GST_DEBUG_OBJECT (appsink, "flush stop appsink");
appsink->is_eos = FALSE; appsink->is_eos = FALSE;
gst_buffer_replace (&appsink->preroll, NULL); gst_buffer_replace (&appsink->preroll, NULL);
while ((buffer = g_queue_pop_head (appsink->queue))) while ((buffer = g_queue_pop_head (appsink->queue)))
@ -406,9 +469,8 @@ gst_app_sink_start (GstBaseSink * psink)
GstAppSink *appsink = GST_APP_SINK (psink); GstAppSink *appsink = GST_APP_SINK (psink);
g_mutex_lock (appsink->mutex); g_mutex_lock (appsink->mutex);
appsink->is_eos = FALSE;
appsink->started = TRUE;
GST_DEBUG_OBJECT (appsink, "starting"); GST_DEBUG_OBJECT (appsink, "starting");
appsink->started = TRUE;
g_mutex_unlock (appsink->mutex); g_mutex_unlock (appsink->mutex);
return TRUE; return TRUE;
@ -421,6 +483,8 @@ gst_app_sink_stop (GstBaseSink * psink)
g_mutex_lock (appsink->mutex); g_mutex_lock (appsink->mutex);
GST_DEBUG_OBJECT (appsink, "stopping"); GST_DEBUG_OBJECT (appsink, "stopping");
appsink->is_eos = FALSE;
appsink->flushing = TRUE;
appsink->started = FALSE; appsink->started = FALSE;
gst_app_sink_flush_unlocked (appsink); gst_app_sink_flush_unlocked (appsink);
g_mutex_unlock (appsink->mutex); g_mutex_unlock (appsink->mutex);
@ -446,6 +510,9 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_EOS], 0); g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_EOS], 0);
break; break;
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
/* we don't have to do anything here, the base class will call unlock
* which will make sure we exit the _render method */
GST_DEBUG_OBJECT (appsink, "received FLUSH_START");
break; break;
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
g_mutex_lock (appsink->mutex); g_mutex_lock (appsink->mutex);
@ -466,6 +533,9 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
gboolean emit; gboolean emit;
g_mutex_lock (appsink->mutex); g_mutex_lock (appsink->mutex);
if (appsink->flushing)
goto flushing;
GST_DEBUG_OBJECT (appsink, "setting preroll buffer %p", buffer); GST_DEBUG_OBJECT (appsink, "setting preroll buffer %p", buffer);
gst_buffer_replace (&appsink->preroll, buffer); gst_buffer_replace (&appsink->preroll, buffer);
g_cond_signal (appsink->cond); g_cond_signal (appsink->cond);
@ -476,6 +546,13 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_PREROLL], 0); g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_PREROLL], 0);
return GST_FLOW_OK; return GST_FLOW_OK;
flushing:
{
GST_DEBUG_OBJECT (appsink, "we are flushing");
g_mutex_unlock (appsink->mutex);
return GST_FLOW_WRONG_STATE;
}
} }
static GstFlowReturn static GstFlowReturn
@ -485,15 +562,31 @@ gst_app_sink_render (GstBaseSink * psink, GstBuffer * buffer)
gboolean emit; gboolean emit;
g_mutex_lock (appsink->mutex); g_mutex_lock (appsink->mutex);
GST_DEBUG_OBJECT (appsink, "pushing render buffer %p on queue", buffer); if (appsink->flushing)
goto flushing;
GST_DEBUG_OBJECT (appsink, "pushing render buffer %p on queue (%d)",
buffer, appsink->queue->length);
while (appsink->max_buffers > 0 && while (appsink->max_buffers > 0 &&
appsink->queue->length >= appsink->max_buffers) { appsink->queue->length >= appsink->max_buffers) {
/* FIXME, do proper unlocking when flushing */ if (appsink->drop) {
g_cond_wait (appsink->cond, appsink->mutex); GstBuffer *buf;
if (!appsink->started)
goto not_started; /* we need to drop the oldest buffer and try again */
buf = g_queue_pop_head (appsink->queue);
GST_DEBUG_OBJECT (appsink, "dropping old buffer %p", buf);
gst_buffer_unref (buf);
} else {
GST_DEBUG_OBJECT (appsink, "waiting for free space, length %d >= %d",
appsink->queue->length, appsink->max_buffers);
/* wait for a buffer to be removed or flush */
g_cond_wait (appsink->cond, appsink->mutex);
if (appsink->flushing)
goto flushing;
}
} }
/* we need to ref the buffer when pushing it in the queue */
g_queue_push_tail (appsink->queue, gst_buffer_ref (buffer)); g_queue_push_tail (appsink->queue, gst_buffer_ref (buffer));
g_cond_signal (appsink->cond); g_cond_signal (appsink->cond);
emit = appsink->emit_signals; emit = appsink->emit_signals;
@ -504,9 +597,9 @@ gst_app_sink_render (GstBaseSink * psink, GstBuffer * buffer)
return GST_FLOW_OK; return GST_FLOW_OK;
not_started: flushing:
{ {
GST_DEBUG_OBJECT (appsink, "we stopped"); GST_DEBUG_OBJECT (appsink, "we are flushing");
g_mutex_unlock (appsink->mutex); g_mutex_unlock (appsink->mutex);
return GST_FLOW_WRONG_STATE; return GST_FLOW_WRONG_STATE;
} }
@ -550,13 +643,14 @@ gst_app_sink_set_caps (GstAppSink * appsink, const GstCaps * caps)
GST_OBJECT_LOCK (appsink); GST_OBJECT_LOCK (appsink);
GST_DEBUG_OBJECT (appsink, "setting caps to %" GST_PTR_FORMAT, caps); GST_DEBUG_OBJECT (appsink, "setting caps to %" GST_PTR_FORMAT, caps);
old = appsink->caps; if ((old = appsink->caps) != caps) {
if (caps) if (caps)
appsink->caps = gst_caps_copy (caps); appsink->caps = gst_caps_copy (caps);
else else
appsink->caps = NULL; appsink->caps = NULL;
if (old) if (old)
gst_caps_unref (old); gst_caps_unref (old);
}
GST_OBJECT_UNLOCK (appsink); GST_OBJECT_UNLOCK (appsink);
} }
@ -715,6 +809,52 @@ gst_app_sink_get_max_buffers (GstAppSink * appsink)
return result; return result;
} }
/**
* gst_app_sink_set_drop:
* @appsink: a #GstAppSink
* @emit: the new state
*
* Instruct @appsink to drop old buffers when the maximum amount of queued
* buffers is reached.
*/
void
gst_app_sink_set_drop (GstAppSink * appsink, gboolean drop)
{
g_return_if_fail (GST_IS_APP_SINK (appsink));
g_mutex_lock (appsink->mutex);
if (appsink->drop != drop) {
appsink->drop = drop;
/* signal the change */
g_cond_signal (appsink->cond);
}
g_mutex_unlock (appsink->mutex);
}
/**
* gst_app_sink_get_drop:
* @appsink: a #GstAppSink
*
* Check if @appsink will drop old buffers when the maximum amount of queued
* buffers is reached.
*
* Returns: %TRUE if @appsink is dropping old buffers when the queue is
* filled.
*/
gboolean
gst_app_sink_get_drop (GstAppSink * appsink)
{
gboolean result;
g_return_val_if_fail (GST_IS_APP_SINK (appsink), FALSE);
g_mutex_lock (appsink->mutex);
result = appsink->drop;
g_mutex_unlock (appsink->mutex);
return result;
}
/** /**
* gst_app_sink_pull_preroll: * gst_app_sink_pull_preroll:
* @appsink: a #GstAppSink * @appsink: a #GstAppSink

View file

@ -47,11 +47,13 @@ struct _GstAppSink
GstCaps *caps; GstCaps *caps;
gboolean emit_signals; gboolean emit_signals;
guint max_buffers; guint max_buffers;
gboolean drop;
GCond *cond; GCond *cond;
GMutex *mutex; GMutex *mutex;
GQueue *queue; GQueue *queue;
GstBuffer *preroll; GstBuffer *preroll;
gboolean flushing;
gboolean started; gboolean started;
gboolean is_eos; gboolean is_eos;
}; };
@ -85,6 +87,9 @@ gboolean gst_app_sink_get_emit_signals (GstAppSink *appsink);
void gst_app_sink_set_max_buffers (GstAppSink *appsink, guint max); void gst_app_sink_set_max_buffers (GstAppSink *appsink, guint max);
guint gst_app_sink_get_max_buffers (GstAppSink *appsink); guint gst_app_sink_get_max_buffers (GstAppSink *appsink);
void gst_app_sink_set_drop (GstAppSink *appsink, gboolean drop);
gboolean gst_app_sink_get_drop (GstAppSink *appsink);
GstBuffer * gst_app_sink_pull_preroll (GstAppSink *appsink); GstBuffer * gst_app_sink_pull_preroll (GstAppSink *appsink);
GstBuffer * gst_app_sink_pull_buffer (GstAppSink *appsink); GstBuffer * gst_app_sink_pull_buffer (GstAppSink *appsink);