queue: Send the notify signals on queue level changes

This is documented as:

> you can query how many buffers are queued by reading the
> #gstqueue:current-level-buffers property. you can track changes
> by connecting to the notify::current-level-buffers signal (which
> like all signals will be emitted from the streaming thread). the same
> applies to the #gstqueue:current-level-time and
> #gstqueue:current-level-bytes properties.

... but was not implemented.

This also respects the `notify::silent` property for the notify signals
to be less intrusive.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7486>
This commit is contained in:
Thibault Saunier 2024-09-09 19:11:33 -03:00 committed by GStreamer Marge Bot
parent 319439a1c0
commit ba94af0285
3 changed files with 78 additions and 8 deletions

View file

@ -1905,6 +1905,18 @@
"type": "guint64", "type": "guint64",
"writable": true "writable": true
}, },
"notify-levels": {
"blurb": "Whether to emit `notify` signals on levels changes or not",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "playing",
"readable": true,
"type": "gboolean",
"writable": true
},
"silent": { "silent": {
"blurb": "Don't emit queue signals", "blurb": "Don't emit queue signals",
"conditionally-available": false, "conditionally-available": false,

View file

@ -36,9 +36,9 @@
* processing on sink and source pad. * processing on sink and source pad.
* *
* You can query how many buffers are queued by reading the * You can query how many buffers are queued by reading the
* #GstQueue:current-level-buffers property. You can track changes * #GstQueue:current-level-buffers property. If you set #queue:notify-levels to TRUE,
* by connecting to the notify::current-level-buffers signal (which * you can track changes by connecting to the notify::current-level-buffers signal
* like all signals will be emitted from the streaming thread). The same * (which like all signals will be emitted from the streaming thread). The same
* applies to the #GstQueue:current-level-time and * applies to the #GstQueue:current-level-time and
* #GstQueue:current-level-bytes properties. * #GstQueue:current-level-bytes properties.
* *
@ -123,6 +123,7 @@ enum
PROP_LEAKY, PROP_LEAKY,
PROP_SILENT, PROP_SILENT,
PROP_FLUSH_ON_EOS, PROP_FLUSH_ON_EOS,
PROP_NOTIFY_LEVELS,
PROP_LAST PROP_LAST
}; };
@ -147,6 +148,12 @@ GParamSpec *properties[PROP_LAST];
g_mutex_unlock (&q->qlock); \ g_mutex_unlock (&q->qlock); \
} G_STMT_END } G_STMT_END
#define GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS(q, prev_level) G_STMT_START { \
GstQueueSize new_level = queue->cur_level; \
g_mutex_unlock (&q->qlock); \
gst_queue_notify_levels (queue, &prev_level, &new_level); \
} G_STMT_END
#define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START { \ #define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START { \
STATUS (q, q->sinkpad, "wait for DEL"); \ STATUS (q, q->sinkpad, "wait for DEL"); \
q->waiting_del = TRUE; \ q->waiting_del = TRUE; \
@ -405,6 +412,20 @@ gst_queue_class_init (GstQueueClass * klass)
"Discard all data in the queue when an EOS event is received", FALSE, "Discard all data in the queue when an EOS event is received", FALSE,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS); G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
/**
* GstQueue:notify-levels
*
* Whether to emit `notify:property-name` signals on levels changes or not
*
* Default: %FALSE
*
* Since: 1.26
*/
properties[PROP_NOTIFY_LEVELS] =
g_param_spec_boolean ("notify-levels", "Notify-Levels",
"Whether to emit `notify` signals on levels changes or not", FALSE,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (gobject_class, PROP_LAST, properties); g_object_class_install_properties (gobject_class, PROP_LAST, properties);
gobject_class->finalize = gst_queue_finalize; gobject_class->finalize = gst_queue_finalize;
@ -746,6 +767,24 @@ apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list,
update_time_level (queue); update_time_level (queue);
} }
static void
gst_queue_notify_levels (GstQueue * queue, GstQueueSize * prev_level,
GstQueueSize * new_level)
{
if (!queue->notify_levels) {
return;
}
if (new_level->buffers != prev_level->buffers)
g_object_notify_by_pspec ((GObject *) queue,
properties[PROP_CUR_LEVEL_BUFFERS]);
if (new_level->bytes != prev_level->bytes)
g_object_notify_by_pspec ((GObject *) queue,
properties[PROP_CUR_LEVEL_BYTES]);
if (new_level->time != prev_level->time)
g_object_notify_by_pspec ((GObject *) queue,
properties[PROP_CUR_LEVEL_TIME]);
}
static void static void
gst_queue_locked_flush (GstQueue * queue, gboolean full) gst_queue_locked_flush (GstQueue * queue, gboolean full)
{ {
@ -1017,6 +1056,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
if (GST_EVENT_IS_SERIALIZED (event)) { if (GST_EVENT_IS_SERIALIZED (event)) {
/* serialized events go in the queue */ /* serialized events go in the queue */
GST_QUEUE_MUTEX_LOCK (queue); GST_QUEUE_MUTEX_LOCK (queue);
GstQueueSize prev_level = queue->cur_level;
/* STREAM_START and SEGMENT reset the EOS status of a /* STREAM_START and SEGMENT reset the EOS status of a
* pad. Change the cached sinkpad flow result accordingly */ * pad. Change the cached sinkpad flow result accordingly */
@ -1070,7 +1110,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
} }
gst_queue_locked_enqueue_event (queue, event); gst_queue_locked_enqueue_event (queue, event);
GST_QUEUE_MUTEX_UNLOCK (queue); GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS (queue, prev_level);
} else { } else {
/* non-serialized events are forwarded downstream immediately */ /* non-serialized events are forwarded downstream immediately */
ret = gst_pad_push_event (queue->srcpad, event); ret = gst_pad_push_event (queue->srcpad, event);
@ -1237,6 +1277,7 @@ gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
/* we have to lock the queue since we span threads */ /* we have to lock the queue since we span threads */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
GstQueueSize prev_level = queue->cur_level;
/* when we received EOS, we refuse any more data */ /* when we received EOS, we refuse any more data */
if (queue->eos) if (queue->eos)
goto out_eos; goto out_eos;
@ -1284,8 +1325,15 @@ gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
/* now we can clean up and exit right away */ /* now we can clean up and exit right away */
goto out_unref; goto out_unref;
case GST_QUEUE_LEAK_DOWNSTREAM: case GST_QUEUE_LEAK_DOWNSTREAM:
{
gst_queue_leak_downstream (queue); gst_queue_leak_downstream (queue);
if (!queue->silent) {
GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS (queue, prev_level);
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
}
break; break;
}
default: default:
g_warning ("Unknown leaky type, using default"); g_warning ("Unknown leaky type, using default");
/* fall-through */ /* fall-through */
@ -1340,14 +1388,14 @@ gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
gst_queue_locked_enqueue_buffer_list (queue, obj); gst_queue_locked_enqueue_buffer_list (queue, obj);
else else
gst_queue_locked_enqueue_buffer (queue, obj); gst_queue_locked_enqueue_buffer (queue, obj);
GST_QUEUE_MUTEX_UNLOCK (queue); GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS (queue, prev_level);
return GST_FLOW_OK; return GST_FLOW_OK;
/* special conditions */ /* special conditions */
out_unref: out_unref:
{ {
GST_QUEUE_MUTEX_UNLOCK (queue); GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS (queue, prev_level);
gst_mini_object_unref (obj); gst_mini_object_unref (obj);
@ -1587,12 +1635,14 @@ gst_queue_loop (GstPad * pad)
} }
} }
GstQueueSize prev_level = queue->cur_level;
ret = gst_queue_push_one (queue); ret = gst_queue_push_one (queue);
queue->srcresult = ret; queue->srcresult = ret;
if (ret != GST_FLOW_OK) if (ret != GST_FLOW_OK)
goto out_flushing; goto out_flushing;
GST_QUEUE_MUTEX_UNLOCK (queue); GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS (queue, prev_level);
return; return;
@ -1617,6 +1667,7 @@ out_flushing:
g_cond_signal (&queue->query_handled); g_cond_signal (&queue->query_handled);
} }
GST_QUEUE_MUTEX_UNLOCK (queue); GST_QUEUE_MUTEX_UNLOCK (queue);
/* let app know about us giving up if upstream is not expected to do so */ /* let app know about us giving up if upstream is not expected to do so */
/* EOS is already taken care of elsewhere */ /* EOS is already taken care of elsewhere */
if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) { if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
@ -1897,6 +1948,9 @@ gst_queue_set_property (GObject * object,
case PROP_FLUSH_ON_EOS: case PROP_FLUSH_ON_EOS:
queue->flush_on_eos = g_value_get_boolean (value); queue->flush_on_eos = g_value_get_boolean (value);
break; break;
case PROP_NOTIFY_LEVELS:
queue->notify_levels = g_value_get_boolean (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -1950,6 +2004,9 @@ gst_queue_get_property (GObject * object,
case PROP_FLUSH_ON_EOS: case PROP_FLUSH_ON_EOS:
g_value_set_boolean (value, queue->flush_on_eos); g_value_set_boolean (value, queue->flush_on_eos);
break; break;
case PROP_NOTIFY_LEVELS:
g_value_set_boolean (value, queue->notify_levels);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;

View file

@ -130,6 +130,7 @@ struct _GstQueue {
gboolean push_newsegment; gboolean push_newsegment;
gboolean silent; /* don't emit signals */ gboolean silent; /* don't emit signals */
gboolean notify_levels; /* emit 'notify' signals on level changes */
/* whether the first new segment has been applied to src */ /* whether the first new segment has been applied to src */
gboolean newseg_applied_to_src; gboolean newseg_applied_to_src;