diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 0353310581..a3f7def4c2 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -556,6 +556,18 @@ "type": "gboolean", "writable": true }, + "leaky-type": { + "blurb": "Whether to drop buffers once the internal queue is full", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "none (0)", + "mutable": "ready", + "readable": true, + "type": "GstAppLeakyType", + "writable": true + }, "max-buffers": { "blurb": "The maximum number of buffers to queue internally (0 = unlimited)", "conditionally-available": false, diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c index 73d92211ba..4c9467602c 100644 --- a/gst-libs/gst/app/gstappsrc.c +++ b/gst-libs/gst/app/gstappsrc.c @@ -155,6 +155,13 @@ struct _GstAppSrcPrivate * the next buffer of buffer list */ gboolean pending_custom_segment; + /* the next buffer that will be queued needs a discont flag + * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */ + gboolean need_discont_upstream; + /* the next buffer that will be dequeued needs a discont flag + * because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */ + gboolean need_discont_downstream; + gint64 size; GstClockTime duration; GstAppStreamType stream_type; @@ -180,6 +187,8 @@ struct _GstAppSrcPrivate guint min_percent; gboolean handle_segment_change; + GstAppLeakyType leaky_type; + Callbacks *callbacks; }; @@ -219,6 +228,7 @@ enum #define DEFAULT_PROP_CURRENT_LEVEL_TIME 0 #define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE #define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE +#define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE enum { @@ -241,6 +251,7 @@ enum PROP_CURRENT_LEVEL_TIME, PROP_DURATION, PROP_HANDLE_SEGMENT_CHANGE, + PROP_LEAKY_TYPE, PROP_LAST }; @@ -541,6 +552,24 @@ gst_app_src_class_init (GstAppSrcClass * klass) G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS)); + /** + * GstAppSrc:leaky-type: + * + * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc + * will drop any buffers that are pushed into it once its internal queue is + * full. The selected type defines whether to drop the oldest or new + * buffers. + * + * Since: 1.20 + */ + g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE, + g_param_spec_enum ("leaky-type", "Leaky Type", + "Whether to drop buffers once the internal queue is full", + GST_TYPE_APP_LEAKY_TYPE, + DEFAULT_PROP_LEAKY_TYPE, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | + G_PARAM_STATIC_STRINGS)); + /** * GstAppSrc::need-data: * @appsrc: the appsrc element that emitted the signal @@ -719,6 +748,7 @@ gst_app_src_init (GstAppSrc * appsrc) priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS; priv->min_percent = DEFAULT_PROP_MIN_PERCENT; priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE; + priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE; gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE); } @@ -750,6 +780,8 @@ gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps) priv->queued_time = 0; priv->last_in_running_time = GST_CLOCK_TIME_NONE; priv->last_out_running_time = GST_CLOCK_TIME_NONE; + priv->need_discont_upstream = FALSE; + priv->need_discont_downstream = FALSE; } static void @@ -878,6 +910,9 @@ gst_app_src_set_property (GObject * object, guint prop_id, case PROP_HANDLE_SEGMENT_CHANGE: priv->handle_segment_change = g_value_get_boolean (value); break; + case PROP_LEAKY_TYPE: + priv->leaky_type = g_value_get_enum (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -957,6 +992,9 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, case PROP_HANDLE_SEGMENT_CHANGE: g_value_set_boolean (value, priv->handle_segment_change); break; + case PROP_LEAKY_TYPE: + g_value_set_enum (value, priv->leaky_type); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1588,12 +1626,33 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, } if (GST_IS_BUFFER (obj)) { - *buf = GST_BUFFER (obj); + GstBuffer *buffer = GST_BUFFER (obj); + + /* Mark the buffer as DISCONT if we previously dropped a buffer + * instead of outputting it */ + if (priv->need_discont_downstream) { + buffer = gst_buffer_make_writable (buffer); + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + priv->need_discont_downstream = FALSE; + } + + *buf = buffer; } else if (GST_IS_BUFFER_LIST (obj)) { GstBufferList *buffer_list; buffer_list = GST_BUFFER_LIST (obj); + /* Mark the first buffer of the buffer list as DISCONT if we + * previously dropped a buffer instead of outputting it */ + if (priv->need_discont_downstream) { + GstBuffer *buffer; + + buffer_list = gst_buffer_list_make_writable (buffer_list); + buffer = gst_buffer_list_get_writable (buffer_list, 0); + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + priv->need_discont_downstream = FALSE; + } + gst_base_src_submit_buffer_list (bsrc, buffer_list); *buf = NULL; } else if (GST_IS_EVENT (obj)) { @@ -2223,6 +2282,45 @@ gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min, } } +/** + * gst_app_src_set_leaky_type: + * @appsrc: a #GstAppSrc + * @leaky: the #GstAppLeakyType + * + * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc + * will drop any buffers that are pushed into it once its internal queue is + * full. The selected type defines whether to drop the oldest or new + * buffers. + * + * Since: 1.20 + */ +void +gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky) +{ + g_return_if_fail (GST_IS_APP_SRC (appsrc)); + + appsrc->priv->leaky_type = leaky; +} + +/** + * gst_app_src_get_leaky_type: + * @appsrc: a #GstAppSrc + * + * Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type() + * for more details. + * + * Returns: The currently set #GstAppLeakyType. + * + * Since: 1.20 + */ +GstAppLeakyType +gst_app_src_get_leaky_type (GstAppSrc * appsrc) +{ + g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE); + + return appsrc->priv->leaky_type; +} + /** * gst_app_src_set_latency: * @appsrc: a #GstAppSrc @@ -2402,6 +2500,43 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer, priv->max_buffers, GST_TIME_ARGS (priv->queued_time), GST_TIME_ARGS (priv->max_time)); + if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) { + priv->need_discont_upstream = TRUE; + goto dropped; + } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) { + guint i, length = gst_queue_array_get_length (priv->queue); + GstMiniObject *item = NULL; + + /* Find the oldest buffer or buffer list and drop it, then update the + * limits. Dropping one is sufficient to go below the limits again. + */ + for (i = 0; i < length; i++) { + item = gst_queue_array_peek_nth (priv->queue, i); + if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) { + gst_queue_array_drop_element (priv->queue, i); + break; + } + /* To not accidentally have an event after the loop */ + item = NULL; + } + + if (!item) { + GST_FIXME_OBJECT (appsrc, + "No buffer or buffer list queued but queue is full"); + /* This shouldn't really happen but in this case we can't really do + * anything apart from accepting the buffer / bufferlist */ + break; + } + + GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item); + + gst_app_src_update_queued_pop (appsrc, item, FALSE); + gst_mini_object_unref (item); + + priv->need_discont_downstream = TRUE; + continue; + } + if (first) { Callbacks *callbacks = NULL; gboolean emit; @@ -2438,8 +2573,9 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer, * stops pushing buffers. */ break; } - } else + } else { break; + } } if (priv->pending_custom_segment) { @@ -2451,11 +2587,39 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer, } if (buflist != NULL) { + /* Mark the first buffer of the buffer list as DISCONT if we previously + * dropped a buffer instead of queueing it */ + if (priv->need_discont_upstream) { + if (!steal_ref) { + buflist = gst_buffer_list_copy (buflist); + steal_ref = TRUE; + } else { + buflist = gst_buffer_list_make_writable (buflist); + } + buffer = gst_buffer_list_get_writable (buflist, 0); + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + priv->need_discont_upstream = FALSE; + } + GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist); + if (!steal_ref) gst_buffer_list_ref (buflist); gst_queue_array_push_tail (priv->queue, buflist); } else { + /* Mark the buffer as DISCONT if we previously dropped a buffer instead of + * queueing it */ + if (priv->need_discont_upstream) { + if (!steal_ref) { + buffer = gst_buffer_copy (buffer); + steal_ref = TRUE; + } else { + buffer = gst_buffer_make_writable (buffer); + } + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + priv->need_discont_upstream = FALSE; + } + GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer); if (!steal_ref) gst_buffer_ref (buffer); @@ -2497,6 +2661,18 @@ eos: g_mutex_unlock (&priv->mutex); return GST_FLOW_EOS; } +dropped: + { + GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer); + if (steal_ref) { + if (buflist) + gst_buffer_list_unref (buflist); + else + gst_buffer_unref (buffer); + } + g_mutex_unlock (&priv->mutex); + return GST_FLOW_EOS; + } } static GstFlowReturn diff --git a/gst-libs/gst/app/gstappsrc.h b/gst-libs/gst/app/gstappsrc.h index b4d0041b09..16180fd165 100644 --- a/gst-libs/gst/app/gstappsrc.h +++ b/gst-libs/gst/app/gstappsrc.h @@ -88,6 +88,23 @@ typedef enum GST_APP_STREAM_TYPE_RANDOM_ACCESS } GstAppStreamType; +/** + * GstAppLeakyType: + * @GST_APP_LEAKY_TYPE_NONE: Not Leaky + * @GST_APP_LEAKY_TYPE_UPSTREAM: Leaky on upstream (new buffers) + * @GST_APP_LEAKY_TYPE_DOWNSTREAM: Leaky on downstream (old buffers) + * + * Buffer dropping scheme to avoid the element's internal queue to block when + * full. + * + * Since: 1.20 + */ +typedef enum { + GST_APP_LEAKY_TYPE_NONE, + GST_APP_LEAKY_TYPE_UPSTREAM, + GST_APP_LEAKY_TYPE_DOWNSTREAM +} GstAppLeakyType; + struct _GstAppSrc { GstBaseSrc basesrc; @@ -172,6 +189,12 @@ GstClockTime gst_app_src_get_max_time (GstAppSrc *appsrc); GST_APP_API GstClockTime gst_app_src_get_current_level_time (GstAppSrc *appsrc); +GST_APP_API +void gst_app_src_set_leaky_type (GstAppSrc *appsrc, GstAppLeakyType leaky); + +GST_APP_API +GstAppLeakyType gst_app_src_get_leaky_type (GstAppSrc *appsrc); + GST_APP_API void gst_app_src_set_latency (GstAppSrc *appsrc, guint64 min, guint64 max);