appsink: add max-time and max-buffers properties

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5321>
This commit is contained in:
Maksym Khomenko 2023-09-13 00:39:52 +03:00 committed by GStreamer Marge Bot
parent 4c13ccec16
commit 60591960c3
5 changed files with 426 additions and 65 deletions

View file

@ -79,8 +79,8 @@ which accept a timeout parameter to limit the amount of time to wait.
Appsink will internally use a queue to collect buffers from the streaming
thread. If the application is not pulling samples fast enough, this queue
will consume a lot of memory over time. The "max-buffers" property can be
used to limit the queue size. The "drop" property controls whether the
will consume a lot of memory over time. The "max-buffers", "max-time" and "max-bytes"
properties can be used to limit the queue size. The "drop" property controls whether the
streaming thread blocks or if older buffers are dropped when the maximum
queue size is reached. Note that blocking the streaming thread can negatively
affect real-time performance and should be avoided.
@ -325,7 +325,7 @@ condition.</doc>
</method>
<method name="get_drop" c:identifier="gst_app_sink_get_drop">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Check if @appsink will drop old buffers when the maximum amount of queued
buffers is reached.</doc>
data is reached (meaning max buffers, time or bytes limit, whichever is hit first).</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.h"/>
<return-value transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">%TRUE if @appsink is dropping old buffers when the queue is
@ -368,6 +368,34 @@ signals.</doc>
</instance-parameter>
</parameters>
</method>
<method name="get_max_bytes" c:identifier="gst_app_sink_get_max_bytes" version="1.24">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Get the maximum total size, in bytes, that can be queued in @appsink.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.h"/>
<return-value transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">The maximum amount of bytes that can be queued</doc>
<type name="guint64" c:type="guint64"/>
</return-value>
<parameters>
<instance-parameter name="appsink" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">a #GstAppSink</doc>
<type name="AppSink" c:type="GstAppSink*"/>
</instance-parameter>
</parameters>
</method>
<method name="get_max_time" c:identifier="gst_app_sink_get_max_time" version="1.24">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Get the maximum total duration that can be queued in @appsink.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.h"/>
<return-value transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">The maximum total duration that can be queued.</doc>
<type name="Gst.ClockTime" c:type="GstClockTime"/>
</return-value>
<parameters>
<instance-parameter name="appsink" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">a #GstAppSink</doc>
<type name="AppSink" c:type="GstAppSink*"/>
</instance-parameter>
</parameters>
</method>
<method name="get_wait_on_eos" c:identifier="gst_app_sink_get_wait_on_eos">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Check if @appsink will wait for all buffers to be consumed when an EOS is
received.</doc>
@ -567,7 +595,7 @@ you must check the caps on the samples to get the actual used caps.</doc>
</method>
<method name="set_drop" c:identifier="gst_app_sink_set_drop">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Instruct @appsink to drop old buffers when the maximum amount of queued
buffers is reached.</doc>
data is reached, that is, when any configured limit is hit (max-buffers, max-time or max-bytes).</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.h"/>
<return-value transfer-ownership="none">
<type name="none" c:type="void"/>
@ -605,7 +633,8 @@ the application prefers to operate in pull mode.</doc>
<method name="set_max_buffers" c:identifier="gst_app_sink_set_max_buffers">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Set the maximum amount of buffers that can be queued in @appsink. After this
amount of buffers are queued in appsink, any more buffers will block upstream
elements until a sample is pulled from @appsink.</doc>
elements until a sample is pulled from @appsink, unless 'drop' is set, in which
case new buffers will be discarded.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.h"/>
<return-value transfer-ownership="none">
<type name="none" c:type="void"/>
@ -621,6 +650,46 @@ elements until a sample is pulled from @appsink.</doc>
</parameter>
</parameters>
</method>
<method name="set_max_bytes" c:identifier="gst_app_sink_set_max_bytes" version="1.24">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Set the maximum total size that can be queued in @appsink. After this
amount of buffers are queued in appsink, any more buffers will block upstream
elements until a sample is pulled from @appsink, unless 'drop' is set, in which
case new buffers will be discarded.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.h"/>
<return-value transfer-ownership="none">
<type name="none" c:type="void"/>
</return-value>
<parameters>
<instance-parameter name="appsink" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">a #GstAppSink</doc>
<type name="AppSink" c:type="GstAppSink*"/>
</instance-parameter>
<parameter name="max" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">the maximum total size of buffers to queue, in bytes</doc>
<type name="guint64" c:type="guint64"/>
</parameter>
</parameters>
</method>
<method name="set_max_time" c:identifier="gst_app_sink_set_max_time" version="1.24">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Set the maximum total duration that can be queued in @appsink. After this
amount of buffers are queued in appsink, any more buffers will block upstream
elements until a sample is pulled from @appsink, unless 'drop' is set, in which
case new buffers will be discarded.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.h"/>
<return-value transfer-ownership="none">
<type name="none" c:type="void"/>
</return-value>
<parameters>
<instance-parameter name="appsink" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">a #GstAppSink</doc>
<type name="AppSink" c:type="GstAppSink*"/>
</instance-parameter>
<parameter name="max" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">the maximum total duration to queue</doc>
<type name="Gst.ClockTime" c:type="GstClockTime"/>
</parameter>
</parameters>
</method>
<method name="set_wait_on_eos" c:identifier="gst_app_sink_set_wait_on_eos">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Instruct @appsink to wait for all buffers to be consumed when an EOS is received.</doc>
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.h"/>
@ -758,9 +827,22 @@ condition.</doc>
<type name="gboolean" c:type="gboolean"/>
</property>
<property name="max-buffers" writable="1" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Maximum amount of buffers in the queue (0 = unlimited).</doc>
<type name="guint" c:type="guint"/>
</property>
<property name="wait-on-eos" writable="1" transfer-ownership="none">
<property name="max-bytes" version="1.24" writable="1" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Maximum amount of bytes in the queue (0 = unlimited)</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="max-time" version="1.24" writable="1" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Maximum total duration of data in the queue (0 = unlimited)</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="wait-on-eos" version="1.8" writable="1" transfer-ownership="none">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Wait for all buffers to be processed after receiving an EOS.
In cases where it is uncertain if an @appsink will have a consumer for its buffers
when it receives an EOS, set to %FALSE to ensure that the @appsink will not hang.</doc>
<type name="gboolean" c:type="gboolean"/>
</property>
<field name="basesink">
@ -885,7 +967,7 @@ can pull samples at its own rate.
Note that when the application does not pull samples fast enough, the
queued samples could consume a lot of memory, especially when dealing with
raw video frames. It's possible to control the behaviour of the queue with
the "drop" and "max-buffers" properties.
the "drop" and "max-buffers" / "max-bytes" / "max-time" set of properties.
If an EOS event was received before any buffers, this function returns
%NULL. Use gst_app_sink_is_eos () to check for the EOS condition.</doc>
@ -906,7 +988,7 @@ Events can be pulled when the appsink is in the READY, PAUSED or PLAYING state.
Note that when the application does not pull samples fast enough, the
queued samples could consume a lot of memory, especially when dealing with
raw video frames. It's possible to control the behaviour of the queue with
the "drop" and "max-buffers" properties.
the "drop" and "max-buffers" / "max-bytes" / "max-time" set of properties.
This function will only pull serialized events, excluding
the EOS event for which this functions returns
@ -971,7 +1053,7 @@ can pull samples at its own rate.
Note that when the application does not pull samples fast enough, the
queued samples could consume a lot of memory, especially when dealing with
raw video frames. It's possible to control the behaviour of the queue with
the "drop" and "max-buffers" properties.
the "drop" and "max-buffers" / "max-bytes" / "max-time" set of properties.
If an EOS event was received before any buffers or the timeout expires,
this function returns %NULL. Use gst_app_sink_is_eos () to check

View file

@ -339,6 +339,34 @@
"type": "guint",
"writable": true
},
"max-bytes": {
"blurb": "The maximum amount of bytes to queue internally (0 = unlimited)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
},
"max-time": {
"blurb": "The maximum total duration to queue internally (in ns, 0 = unlimited)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
},
"wait-on-eos": {
"blurb": "Wait for all buffers to be processed after receiving an EOS",
"conditionally-available": false,

View file

@ -40,8 +40,8 @@
*
* Appsink will internally use a queue to collect buffers from the streaming
* thread. If the application is not pulling samples fast enough, this queue
* will consume a lot of memory over time. The "max-buffers" property can be
* used to limit the queue size. The "drop" property controls whether the
* will consume a lot of memory over time. The "max-buffers", "max-time" and "max-bytes"
* properties can be used to limit the queue size. The "drop" property controls whether the
* streaming thread blocks or if older buffers are dropped when the maximum
* queue size is reached. Note that blocking the streaming thread can negatively
* affect real-time performance and should be avoided.
@ -72,6 +72,7 @@
#include <string.h>
#include "gstappsink.h"
#include "gstapputils.h"
typedef enum
{
@ -112,12 +113,13 @@ struct _GstAppSinkPrivate
{
GstCaps *caps;
gboolean emit_signals;
guint num_buffers;
guint num_events;
guint max_buffers;
guint64 max_buffers;
GstClockTime max_time;
guint64 max_bytes;
gboolean drop;
gboolean wait_on_eos;
GstAppSinkWaitStatus wait_status;
GstQueueStatusInfo queue_status_info;
GCond cond;
GMutex mutex;
@ -163,6 +165,8 @@ enum
#define DEFAULT_PROP_EOS TRUE
#define DEFAULT_PROP_EMIT_SIGNALS FALSE
#define DEFAULT_PROP_MAX_BUFFERS 0
#define DEFAULT_PROP_MAX_TIME 0
#define DEFAULT_PROP_MAX_BYTES 0
#define DEFAULT_PROP_DROP FALSE
#define DEFAULT_PROP_WAIT_ON_EOS TRUE
#define DEFAULT_PROP_BUFFER_LIST FALSE
@ -177,6 +181,8 @@ enum
PROP_DROP,
PROP_WAIT_ON_EOS,
PROP_BUFFER_LIST,
PROP_MAX_TIME,
PROP_MAX_BYTES,
PROP_LAST
};
@ -255,12 +261,43 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
DEFAULT_PROP_EMIT_SIGNALS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSink:max-buffers:
*
* Maximum amount of buffers in the queue (0 = unlimited).
*/
g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
g_param_spec_uint ("max-buffers", "Max Buffers",
"The maximum number of buffers to queue internally (0 = unlimited)",
0, G_MAXUINT, DEFAULT_PROP_MAX_BUFFERS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSink:max-time:
*
* Maximum total duration of data in the queue (0 = unlimited)
*
* Since: 1.24
*/
g_object_class_install_property (gobject_class, PROP_MAX_TIME,
g_param_spec_uint64 ("max-time", "Max time",
"The maximum total duration to queue internally (in ns, 0 = unlimited)",
0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSink:max-bytes:
*
* Maximum amount of bytes in the queue (0 = unlimited)
*
* Since: 1.24
*/
g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
g_param_spec_uint64 ("max-bytes", "Max bytes",
"The maximum amount of bytes to queue internally (0 = unlimited)",
0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_DROP,
g_param_spec_boolean ("drop", "Drop",
"Drop old buffers when the buffer queue is filled", DEFAULT_PROP_DROP,
@ -270,8 +307,9 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
g_param_spec_boolean ("buffer-list", "Buffer List",
"Use buffer lists", DEFAULT_PROP_BUFFER_LIST,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSink::wait-on-eos:
* GstAppSink:wait-on-eos:
*
* Wait for all buffers to be processed after receiving an EOS.
*
@ -425,7 +463,7 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
* Note that when the application does not pull samples fast enough, the
* queued samples could consume a lot of memory, especially when dealing with
* raw video frames. It's possible to control the behaviour of the queue with
* the "drop" and "max-buffers" properties.
* the "drop" and "max-buffers" / "max-bytes" / "max-time" set of properties.
*
* If an EOS event was received before any buffers, this function returns
* %NULL. Use gst_app_sink_is_eos () to check for the EOS condition.
@ -487,7 +525,7 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
* Note that when the application does not pull samples fast enough, the
* queued samples could consume a lot of memory, especially when dealing with
* raw video frames. It's possible to control the behaviour of the queue with
* the "drop" and "max-buffers" properties.
* the "drop" and "max-buffers" / "max-bytes" / "max-time" set of properties.
*
* If an EOS event was received before any buffers or the timeout expires,
* this function returns %NULL. Use gst_app_sink_is_eos () to check
@ -519,7 +557,7 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
* Note that when the application does not pull samples fast enough, the
* queued samples could consume a lot of memory, especially when dealing with
* raw video frames. It's possible to control the behaviour of the queue with
* the "drop" and "max-buffers" properties.
* the "drop" and "max-buffers" / "max-bytes" / "max-time" set of properties.
*
* This function will only pull serialized events, excluding
* the EOS event for which this functions returns
@ -582,6 +620,8 @@ gst_app_sink_init (GstAppSink * appsink)
priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
priv->max_time = DEFAULT_PROP_MAX_TIME;
priv->drop = DEFAULT_PROP_DROP;
priv->wait_on_eos = DEFAULT_PROP_WAIT_ON_EOS;
priv->buffer_lists_supported = DEFAULT_PROP_BUFFER_LIST;
@ -651,6 +691,12 @@ gst_app_sink_set_property (GObject * object, guint prop_id,
case PROP_MAX_BUFFERS:
gst_app_sink_set_max_buffers (appsink, g_value_get_uint (value));
break;
case PROP_MAX_TIME:
gst_app_sink_set_max_time (appsink, g_value_get_uint64 (value));
break;
case PROP_MAX_BYTES:
gst_app_sink_set_max_bytes (appsink, g_value_get_uint64 (value));
break;
case PROP_DROP:
gst_app_sink_set_drop (appsink, g_value_get_boolean (value));
break;
@ -693,6 +739,12 @@ gst_app_sink_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_MAX_BUFFERS:
g_value_set_uint (value, gst_app_sink_get_max_buffers (appsink));
break;
case PROP_MAX_TIME:
g_value_set_uint64 (value, gst_app_sink_get_max_time (appsink));
break;
case PROP_MAX_BYTES:
g_value_set_uint64 (value, gst_app_sink_get_max_bytes (appsink));
break;
case PROP_DROP:
g_value_set_boolean (value, gst_app_sink_get_drop (appsink));
break;
@ -750,8 +802,9 @@ gst_app_sink_flush_unlocked (GstAppSink * appsink)
gst_buffer_replace (&priv->preroll_buffer, NULL);
while ((obj = gst_queue_array_pop_head (priv->queue)))
gst_mini_object_unref (obj);
priv->num_buffers = 0;
priv->num_events = 0;
gst_queue_status_info_reset (&priv->queue_status_info);
gst_caps_replace (&priv->last_caps, NULL);
g_cond_signal (&priv->cond);
}
@ -814,8 +867,10 @@ gst_app_sink_setcaps (GstBaseSink * sink, GstCaps * caps)
g_mutex_lock (&priv->mutex);
GST_DEBUG_OBJECT (appsink, "receiving CAPS");
gst_queue_array_push_tail (priv->queue, gst_event_new_caps (caps));
priv->num_events++;
gst_queue_status_info_push_event (&priv->queue_status_info);
if (!priv->preroll_buffer)
gst_caps_replace (&priv->preroll_caps, caps);
g_mutex_unlock (&priv->mutex);
@ -850,11 +905,13 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
g_mutex_unlock (&priv->mutex);
g_mutex_lock (&priv->mutex);
/* wait until all buffers are consumed or we're flushing.
* Otherwise we might signal EOS before all buffers are
/* Wait until all buffers are consumed, or we're flushing.
* Otherwise, we might signal EOS before all buffers are
* consumed, which is a bit confusing for the application
*/
while (priv->num_buffers > 0 && !priv->flushing && priv->wait_on_eos) {
while (priv->queue_status_info.queued_buffers > 0 && !priv->flushing
&& priv->wait_on_eos) {
if (priv->unlock) {
/* we are asked to unlock, call the wait_preroll method */
g_mutex_unlock (&priv->mutex);
@ -920,7 +977,7 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
callbacks = callbacks_ref (priv->callbacks);
gst_queue_array_push_tail (priv->queue, gst_event_ref (event));
priv->num_events++;
gst_queue_status_info_push_event (&priv->queue_status_info);
g_mutex_unlock (&priv->mutex);
@ -998,12 +1055,9 @@ dequeue_object (GstAppSink * appsink)
if (GST_IS_BUFFER (obj) || GST_IS_BUFFER_LIST (obj)) {
GST_DEBUG_OBJECT (appsink, "dequeued buffer/list %p", obj);
priv->num_buffers--;
} else if (GST_IS_EVENT (obj)) {
GstEvent *event = GST_EVENT_CAST (obj);
priv->num_events--;
switch (GST_EVENT_TYPE (obj)) {
case GST_EVENT_CAPS:
{
@ -1028,6 +1082,10 @@ dequeue_object (GstAppSink * appsink)
}
}
/* We don't have last/current segment differentiation in appsink, so pass last_segment twice */
gst_queue_status_info_pop (&priv->queue_status_info, obj,
&priv->last_segment, &priv->last_segment, GST_OBJECT_CAST (appsink));
return obj;
}
@ -1075,10 +1133,12 @@ restart:
priv->last_caps);
}
GST_DEBUG_OBJECT (appsink, "pushing render buffer/list %p on queue (%d)",
data, priv->num_buffers);
GST_DEBUG_OBJECT (appsink,
"pushing render buffer/list %p on queue (%" G_GUINT64_FORMAT ")", data,
priv->queue_status_info.queued_buffers);
while (priv->max_buffers > 0 && priv->num_buffers >= priv->max_buffers) {
while (gst_queue_status_info_is_full (&priv->queue_status_info,
priv->max_buffers, priv->max_bytes, priv->max_time)) {
if (priv->drop) {
GstMiniObject *old;
@ -1088,8 +1148,13 @@ restart:
gst_mini_object_unref (old);
}
} else {
GST_DEBUG_OBJECT (appsink, "waiting for free space, length %d >= %d",
priv->num_buffers, priv->max_buffers);
GST_DEBUG_OBJECT (appsink,
"waiting for free space: have %" G_GUINT64_FORMAT " buffers (max %"
G_GUINT64_FORMAT "), %" G_GUINT64_FORMAT " bytes (max %"
G_GUINT64_FORMAT "), %" G_GUINT64_FORMAT " time (max %"
G_GUINT64_FORMAT ")", priv->queue_status_info.queued_buffers,
priv->max_buffers, priv->queue_status_info.queued_bytes,
priv->max_bytes, priv->queue_status_info.queued_time, priv->max_time);
if (priv->unlock) {
/* we are asked to unlock, call the wait_preroll method */
@ -1112,7 +1177,8 @@ restart:
}
/* we need to ref the buffer/list when pushing it in the queue */
gst_queue_array_push_tail (priv->queue, gst_mini_object_ref (data));
priv->num_buffers++;
gst_queue_status_info_push (&priv->queue_status_info, data,
&priv->last_segment, GST_OBJECT_CAST (appsink));
if ((priv->wait_status & APP_WAITING))
g_cond_signal (&priv->cond);
@ -1215,7 +1281,7 @@ gst_app_sink_query (GstBaseSink * bsink, GstQuery * query)
{
g_mutex_lock (&priv->mutex);
GST_DEBUG_OBJECT (appsink, "waiting buffers to be consumed");
while (priv->num_buffers > 0 || priv->preroll_buffer) {
while (priv->queue_status_info.queued_buffers > 0 || priv->preroll_buffer) {
if (priv->unlock) {
/* we are asked to unlock, call the wait_preroll method */
g_mutex_unlock (&priv->mutex);
@ -1346,7 +1412,7 @@ gst_app_sink_is_eos (GstAppSink * appsink)
if (!priv->started)
goto not_started;
if (priv->is_eos && priv->num_buffers == 0) {
if (priv->is_eos && priv->queue_status_info.queued_buffers == 0) {
GST_DEBUG_OBJECT (appsink, "we are EOS and the queue is empty");
ret = TRUE;
} else {
@ -1414,6 +1480,41 @@ gst_app_sink_get_emit_signals (GstAppSink * appsink)
return result;
}
#define GST_APP_SINK_GET_PROPERTY(prop_name) \
G_STMT_START { \
GstAppSinkPrivate *priv; \
guint result; \
\
g_return_val_if_fail (GST_IS_APP_SINK (appsink), 0); \
\
priv = appsink->priv; \
\
g_mutex_lock (&priv->mutex); \
result = priv->prop_name; \
g_mutex_unlock (&priv->mutex); \
\
return result; \
} G_STMT_END
#define GST_APP_SINK_SET_PROPERTY(prop_name, value) \
G_STMT_START { \
GstAppSinkPrivate *priv; \
\
g_return_if_fail (GST_IS_APP_SINK (appsink)); \
\
priv = appsink->priv; \
\
g_mutex_lock (&priv->mutex); \
\
if (value != priv->prop_name) { \
priv->prop_name = value; \
/* signal the change */ \
g_cond_signal (&priv->cond); \
} \
\
g_mutex_unlock (&priv->mutex); \
} G_STMT_END
/**
* gst_app_sink_set_max_buffers:
* @appsink: a #GstAppSink
@ -1421,24 +1522,49 @@ gst_app_sink_get_emit_signals (GstAppSink * appsink)
*
* Set the maximum amount of buffers that can be queued in @appsink. After this
* amount of buffers are queued in appsink, any more buffers will block upstream
* elements until a sample is pulled from @appsink.
* elements until a sample is pulled from @appsink, unless 'drop' is set, in which
* case new buffers will be discarded.
*/
void
gst_app_sink_set_max_buffers (GstAppSink * appsink, guint max)
{
GstAppSinkPrivate *priv;
g_return_if_fail (GST_IS_APP_SINK (appsink));
priv = appsink->priv;
g_mutex_lock (&priv->mutex);
if (max != priv->max_buffers) {
priv->max_buffers = max;
/* signal the change */
g_cond_signal (&priv->cond);
GST_APP_SINK_SET_PROPERTY (max_buffers, max);
}
g_mutex_unlock (&priv->mutex);
/**
* gst_app_sink_set_max_time:
* @appsink: a #GstAppSink
* @max: the maximum total duration to queue
*
* Set the maximum total duration that can be queued in @appsink. After this
* amount of buffers are queued in appsink, any more buffers will block upstream
* elements until a sample is pulled from @appsink, unless 'drop' is set, in which
* case new buffers will be discarded.
*
* Since: 1.24
*/
void
gst_app_sink_set_max_time (GstAppSink * appsink, GstClockTime max)
{
GST_APP_SINK_SET_PROPERTY (max_time, max);
}
/**
* gst_app_sink_set_max_bytes:
* @appsink: a #GstAppSink
* @max: the maximum total size of buffers to queue, in bytes
*
* Set the maximum total size that can be queued in @appsink. After this
* amount of buffers are queued in appsink, any more buffers will block upstream
* elements until a sample is pulled from @appsink, unless 'drop' is set, in which
* case new buffers will be discarded.
*
* Since: 1.24
*/
void
gst_app_sink_set_max_bytes (GstAppSink * appsink, guint64 max)
{
GST_APP_SINK_SET_PROPERTY (max_bytes, max);
}
/**
@ -1452,27 +1578,51 @@ gst_app_sink_set_max_buffers (GstAppSink * appsink, guint max)
guint
gst_app_sink_get_max_buffers (GstAppSink * appsink)
{
guint result;
GstAppSinkPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SINK (appsink), 0);
priv = appsink->priv;
g_mutex_lock (&priv->mutex);
result = priv->max_buffers;
g_mutex_unlock (&priv->mutex);
return result;
GST_APP_SINK_GET_PROPERTY (max_buffers);
}
/**
* gst_app_sink_get_max_time:
* @appsink: a #GstAppSink
*
* Get the maximum total duration that can be queued in @appsink.
*
* Returns: The maximum total duration that can be queued.
*
* Since: 1.24
*/
GstClockTime
gst_app_sink_get_max_time (GstAppSink * appsink)
{
GST_APP_SINK_GET_PROPERTY (max_time);
}
/**
* gst_app_sink_get_max_bytes:
* @appsink: a #GstAppSink
*
* Get the maximum total size, in bytes, that can be queued in @appsink.
*
* Returns: The maximum amount of bytes that can be queued
*
* Since: 1.24
*/
guint64
gst_app_sink_get_max_bytes (GstAppSink * appsink)
{
GST_APP_SINK_GET_PROPERTY (max_bytes);
}
#undef GST_APP_SINK_GET_PROPERTY
#undef GST_APP_SINK_SET_PROPERTY
/**
* gst_app_sink_set_drop:
* @appsink: a #GstAppSink
* @drop: the new state
*
* Instruct @appsink to drop old buffers when the maximum amount of queued
* buffers is reached.
* data is reached, that is, when any configured limit is hit (max-buffers, max-time or max-bytes).
*/
void
gst_app_sink_set_drop (GstAppSink * appsink, gboolean drop)
@ -1497,7 +1647,7 @@ gst_app_sink_set_drop (GstAppSink * appsink, gboolean drop)
* @appsink: a #GstAppSink
*
* Check if @appsink will drop old buffers when the maximum amount of queued
* buffers is reached.
* data is reached (meaning max buffers, time or bytes limit, whichever is hit first).
*
* Returns: %TRUE if @appsink is dropping old buffers when the queue is
* filled.
@ -1920,7 +2070,8 @@ gst_app_sink_try_pull_object (GstAppSink * appsink, GstClockTime timeout)
if (!priv->started)
goto not_started;
if (priv->num_buffers > 0 || priv->num_events > 0)
if (priv->queue_status_info.queued_buffers > 0
|| priv->queue_status_info.num_events > 0)
break;
if (priv->is_eos)

View file

@ -151,6 +151,18 @@ void gst_app_sink_set_max_buffers (GstAppSink *appsink, guint max);
GST_APP_API
guint gst_app_sink_get_max_buffers (GstAppSink *appsink);
GST_APP_API
void gst_app_sink_set_max_time (GstAppSink *appsink, GstClockTime max);
GST_APP_API
GstClockTime gst_app_sink_get_max_time (GstAppSink *appsink);
GST_APP_API
void gst_app_sink_set_max_bytes (GstAppSink *appsink, guint64 max);
GST_APP_API
guint64 gst_app_sink_get_max_bytes (GstAppSink *appsink);
GST_APP_API
void gst_app_sink_set_drop (GstAppSink *appsink, gboolean drop);
@ -198,4 +210,3 @@ G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstAppSink, gst_object_unref)
G_END_DECLS
#endif

View file

@ -200,7 +200,7 @@ GST_END_TEST;
static const gint values[] = { 1, 2, 4 };
static GstBufferList *
create_buffer_list (void)
create_buffer_list_with_buffer_duration (GstClockTime duration)
{
guint len;
GstBuffer *buffer;
@ -224,9 +224,27 @@ create_buffer_list (void)
gst_buffer_fill (buffer, 0, &values[2], sizeof (gint));
gst_buffer_list_add (mylist, buffer);
if (GST_CLOCK_TIME_IS_VALID (duration)) {
guint i;
GstClockTime dts = 0;
GstBuffer *current;
for (i = 0; i < gst_buffer_list_length (mylist); ++i) {
current = gst_buffer_list_get (mylist, i);
GST_BUFFER_DTS (current) = dts;
dts += duration;
}
}
return mylist;
}
static GstBufferList *
create_buffer_list (void)
{
return create_buffer_list_with_buffer_duration (GST_CLOCK_TIME_NONE);
}
static GstFlowReturn
callback_function_sample_fallback (GstAppSink * appsink, gpointer p_counter)
{
@ -1155,6 +1173,75 @@ GST_START_TEST (test_query_allocation_signals)
GST_END_TEST;
struct TestBufferingLimitsParams
{
guint64 max_time;
guint max_buffers;
guint max_bytes;
guint expected_num_samples;
};
static struct TestBufferingLimitsParams test_buffering_limit_params[] = {
/* no limits */
{0, 0, 0, 3},
/* exceeded time limit and max-time is not a multiple of buffer duration:
* this effectively means the queue will have to pass one additional buffer before blocking/dropping */
{50 * GST_MSECOND, 0, 0, 3},
/* exceeded buffers limit */
{0, 2, 0, 2},
/* exceeded bytes limit */
{0, 0, 2 * sizeof (guint), 2},
/* time and bytes; time is exceeded first */
{20 * GST_MSECOND, 0, 2 * sizeof (guint), 1},
/* time, buffers and bytes; bytes are exceeded first */
{60 * GST_MSECOND, 2, 1 * sizeof (guint), 1},
};
GST_START_TEST (test_buffering_limits)
{
const gboolean use_lists = __i__ % 2;
struct TestBufferingLimitsParams *param =
&test_buffering_limit_params[__i__ / 2];
GstAppSink *app_sink = GST_APP_SINK (setup_appsink ());
guint num_samples = 0;
GstSample *queued_sample;
GstBufferList *list;
gint j;
ASSERT_SET_STATE (app_sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
gst_app_sink_set_max_time (app_sink, param->max_time);
gst_app_sink_set_max_bytes (app_sink, param->max_bytes);
gst_app_sink_set_max_buffers (app_sink, param->max_buffers);
gst_app_sink_set_drop (app_sink, TRUE);
list = create_buffer_list_with_buffer_duration (20 * GST_MSECOND);
if (use_lists) {
gst_buffer_list_ref (list);
gst_pad_push_list (mysrcpad, list);
} else {
for (j = 0; j < gst_buffer_list_length (list); j++) {
GstBuffer *buf = gst_buffer_list_get (list, j);
gst_buffer_ref (buf);
gst_pad_push (mysrcpad, buf);
}
}
while ((queued_sample = gst_app_sink_try_pull_sample (app_sink, 0))) {
num_samples++;
gst_sample_unref (queued_sample);
}
fail_unless_equals_int (num_samples, param->expected_num_samples);
ASSERT_SET_STATE (app_sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (GST_ELEMENT_CAST (app_sink));
gst_buffer_list_unref (list);
}
GST_END_TEST;
static Suite *
appsink_suite (void)
{
@ -1183,6 +1270,8 @@ appsink_suite (void)
tcase_add_test (tc_chain, test_caps_before_flush_race_condition);
tcase_add_test (tc_chain, test_query_allocation_callback);
tcase_add_test (tc_chain, test_query_allocation_signals);
tcase_add_loop_test (tc_chain, test_buffering_limits, 0,
G_N_ELEMENTS (test_buffering_limit_params) * 2);
return s;
}