appsrc: add min-percent property

Emit need-data when the amount of data in the internal queue drops below
min-percent.

Fixes #608309
This commit is contained in:
Wim Taymans 2010-02-01 19:01:33 +01:00 committed by Wim Taymans
parent fac9346405
commit c94356ad9b

View file

@ -140,6 +140,7 @@ struct _GstAppSrcPrivate
guint64 min_latency; guint64 min_latency;
guint64 max_latency; guint64 max_latency;
gboolean emit_signals; gboolean emit_signals;
guint min_percent;
GstAppSrcCallbacks callbacks; GstAppSrcCallbacks callbacks;
gpointer user_data; gpointer user_data;
@ -172,6 +173,7 @@ enum
#define DEFAULT_PROP_MIN_LATENCY -1 #define DEFAULT_PROP_MIN_LATENCY -1
#define DEFAULT_PROP_MAX_LATENCY -1 #define DEFAULT_PROP_MAX_LATENCY -1
#define DEFAULT_PROP_EMIT_SIGNALS TRUE #define DEFAULT_PROP_EMIT_SIGNALS TRUE
#define DEFAULT_PROP_MIN_PERCENT 0
enum enum
{ {
@ -186,6 +188,7 @@ enum
PROP_MIN_LATENCY, PROP_MIN_LATENCY,
PROP_MAX_LATENCY, PROP_MAX_LATENCY,
PROP_EMIT_SIGNALS, PROP_EMIT_SIGNALS,
PROP_MIN_PERCENT,
PROP_LAST PROP_LAST
}; };
@ -400,6 +403,20 @@ gst_app_src_class_init (GstAppSrcClass * klass)
"Emit new-preroll and new-buffer signals", DEFAULT_PROP_EMIT_SIGNALS, "Emit new-preroll and new-buffer signals", DEFAULT_PROP_EMIT_SIGNALS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::empty-percent
*
* Make appsrc emit the "need-data" signal when the amount of bytes in the
* queue drops below this percentage of max-bytes.
*
* Since: 0.10.26
*/
g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
g_param_spec_uint ("min-percent", "Min Percent",
"Emit need-data when queued bytes drops below this percent of max-bytes",
0, 100, DEFAULT_PROP_MIN_PERCENT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/** /**
* GstAppSrc::need-data: * GstAppSrc::need-data:
* @appsrc: the appsrc element that emited the signal * @appsrc: the appsrc element that emited the signal
@ -517,6 +534,7 @@ gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
priv->min_latency = DEFAULT_PROP_MIN_LATENCY; priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
priv->max_latency = DEFAULT_PROP_MAX_LATENCY; priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS; priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE); gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
} }
@ -601,6 +619,9 @@ gst_app_src_set_property (GObject * object, guint prop_id,
case PROP_EMIT_SIGNALS: case PROP_EMIT_SIGNALS:
gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value)); gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
break; break;
case PROP_MIN_PERCENT:
priv->min_percent = g_value_get_uint (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -663,6 +684,9 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_EMIT_SIGNALS: case PROP_EMIT_SIGNALS:
g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc)); g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
break; break;
case PROP_MIN_PERCENT:
g_value_set_uint (value, priv->min_percent);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -860,6 +884,54 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
return res; return res;
} }
/* must be called with the appsrc mutex */
static gboolean
gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
{
gboolean res;
gboolean emit;
GstAppSrcPrivate *priv = appsrc->priv;
emit = priv->emit_signals;
g_mutex_unlock (priv->mutex);
GST_DEBUG_OBJECT (appsrc,
"we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
priv->offset, offset);
if (priv->callbacks.seek_data)
res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
offset, &res);
g_mutex_lock (priv->mutex);
return res;
}
/* must be called with the appsrc mutex. After this call things can be
* flushing */
static void
gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
{
gboolean emit;
GstAppSrcPrivate *priv = appsrc->priv;
emit = priv->emit_signals;
g_mutex_unlock (priv->mutex);
/* we have no data, we need some. We fire the signal with the size hint. */
if (priv->callbacks.need_data)
priv->callbacks.need_data (appsrc, size, priv->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
NULL);
g_mutex_lock (priv->mutex);
/* we can be flushing now because we released the lock */
}
static GstFlowReturn static GstFlowReturn
gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
GstBuffer ** buf) GstBuffer ** buf)
@ -878,27 +950,14 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
* changed. */ * changed. */
if (G_UNLIKELY (priv->offset != offset)) { if (G_UNLIKELY (priv->offset != offset)) {
gboolean res; gboolean res;
gboolean emit;
emit = priv->emit_signals; /* do the seek */
g_mutex_unlock (priv->mutex); res = gst_app_src_emit_seek (appsrc, offset);
GST_DEBUG_OBJECT (appsrc,
"we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
priv->offset, offset);
if (priv->callbacks.seek_data)
res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
offset, &res);
if (G_UNLIKELY (!res)) if (G_UNLIKELY (!res))
/* failing to seek is fatal */ /* failing to seek is fatal */
goto seek_error; goto seek_error;
g_mutex_lock (priv->mutex);
priv->offset = offset; priv->offset = offset;
} }
} }
@ -916,32 +975,26 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
priv->queued_bytes -= buf_size; priv->queued_bytes -= buf_size;
/* only update the offset when in random_access mode */ /* only update the offset when in random_access mode */
if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) { if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
priv->offset += buf_size; priv->offset += buf_size;
}
gst_buffer_set_caps (*buf, priv->caps); gst_buffer_set_caps (*buf, priv->caps);
/* signal that we removed an item */ /* signal that we removed an item */
g_cond_broadcast (priv->cond); g_cond_broadcast (priv->cond);
/* see if we go lower than the empty-percent */
if (priv->min_percent && priv->max_bytes) {
if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
/* ignore flushing state, we got a buffer and we will return it now.
* Errors will be handled in the next round */
gst_app_src_emit_need_data (appsrc, size);
}
ret = GST_FLOW_OK; ret = GST_FLOW_OK;
break; break;
} else { } else {
gboolean emit; gst_app_src_emit_need_data (appsrc, size);
emit = priv->emit_signals; /* we can be flushing now because we released the lock above */
g_mutex_unlock (priv->mutex);
/* we have no data, we need some. We fire the signal with the size hint. */
if (priv->callbacks.need_data)
priv->callbacks.need_data (appsrc, size, priv->user_data);
else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
NULL);
g_mutex_lock (priv->mutex);
/* we can be flushing now because we released the lock */
if (G_UNLIKELY (priv->flushing)) if (G_UNLIKELY (priv->flushing))
goto flushing; goto flushing;
@ -982,6 +1035,7 @@ eos:
} }
seek_error: seek_error:
{ {
g_mutex_unlock (priv->mutex);
GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"), GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
GST_ERROR_SYSTEM); GST_ERROR_SYSTEM);
return GST_FLOW_ERROR; return GST_FLOW_ERROR;
@ -1238,8 +1292,8 @@ static void
gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min, gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
gboolean do_max, guint64 max) gboolean do_max, guint64 max)
{ {
gboolean changed = FALSE;
GstAppSrcPrivate *priv = appsrc->priv; GstAppSrcPrivate *priv = appsrc->priv;
gboolean changed = FALSE;
g_mutex_lock (priv->mutex); g_mutex_lock (priv->mutex);
if (do_min && priv->min_latency != min) { if (do_min && priv->min_latency != min) {