diff --git a/subprojects/gstreamer/docs/plugins/gst_plugins_cache.json b/subprojects/gstreamer/docs/plugins/gst_plugins_cache.json index 33f8baa9ea..fd579549bf 100644 --- a/subprojects/gstreamer/docs/plugins/gst_plugins_cache.json +++ b/subprojects/gstreamer/docs/plugins/gst_plugins_cache.json @@ -1905,6 +1905,18 @@ "type": "guint64", "writable": true }, + "notify-levels": { + "blurb": "Whether to emit `notify` signals on levels changes or not", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "playing", + "readable": true, + "type": "gboolean", + "writable": true + }, "silent": { "blurb": "Don't emit queue signals", "conditionally-available": false, diff --git a/subprojects/gstreamer/plugins/elements/gstqueue.c b/subprojects/gstreamer/plugins/elements/gstqueue.c index 58aef4acbe..8a654cc2b0 100644 --- a/subprojects/gstreamer/plugins/elements/gstqueue.c +++ b/subprojects/gstreamer/plugins/elements/gstqueue.c @@ -36,9 +36,9 @@ * processing on sink and source pad. * * You can query how many buffers are queued by reading the - * #GstQueue:current-level-buffers property. You can track changes - * by connecting to the notify::current-level-buffers signal (which - * like all signals will be emitted from the streaming thread). The same + * #GstQueue:current-level-buffers property. If you set #queue:notify-levels to TRUE, + * you can track changes by connecting to the notify::current-level-buffers signal + * (which like all signals will be emitted from the streaming thread). The same * applies to the #GstQueue:current-level-time and * #GstQueue:current-level-bytes properties. * @@ -123,6 +123,7 @@ enum PROP_LEAKY, PROP_SILENT, PROP_FLUSH_ON_EOS, + PROP_NOTIFY_LEVELS, PROP_LAST }; @@ -147,6 +148,12 @@ GParamSpec *properties[PROP_LAST]; g_mutex_unlock (&q->qlock); \ } G_STMT_END +#define GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS(q, prev_level) G_STMT_START { \ + GstQueueSize new_level = queue->cur_level; \ + g_mutex_unlock (&q->qlock); \ + gst_queue_notify_levels (queue, &prev_level, &new_level); \ +} G_STMT_END + #define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START { \ STATUS (q, q->sinkpad, "wait for DEL"); \ q->waiting_del = TRUE; \ @@ -405,6 +412,20 @@ gst_queue_class_init (GstQueueClass * klass) "Discard all data in the queue when an EOS event is received", FALSE, G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS); + /** + * GstQueue:notify-levels + * + * Whether to emit `notify:property-name` signals on levels changes or not + * + * Default: %FALSE + * + * Since: 1.26 + */ + properties[PROP_NOTIFY_LEVELS] = + g_param_spec_boolean ("notify-levels", "Notify-Levels", + "Whether to emit `notify` signals on levels changes or not", FALSE, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS); + g_object_class_install_properties (gobject_class, PROP_LAST, properties); gobject_class->finalize = gst_queue_finalize; @@ -746,6 +767,24 @@ apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list, update_time_level (queue); } +static void +gst_queue_notify_levels (GstQueue * queue, GstQueueSize * prev_level, + GstQueueSize * new_level) +{ + if (!queue->notify_levels) { + return; + } + if (new_level->buffers != prev_level->buffers) + g_object_notify_by_pspec ((GObject *) queue, + properties[PROP_CUR_LEVEL_BUFFERS]); + if (new_level->bytes != prev_level->bytes) + g_object_notify_by_pspec ((GObject *) queue, + properties[PROP_CUR_LEVEL_BYTES]); + if (new_level->time != prev_level->time) + g_object_notify_by_pspec ((GObject *) queue, + properties[PROP_CUR_LEVEL_TIME]); +} + static void gst_queue_locked_flush (GstQueue * queue, gboolean full) { @@ -1017,6 +1056,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) if (GST_EVENT_IS_SERIALIZED (event)) { /* serialized events go in the queue */ GST_QUEUE_MUTEX_LOCK (queue); + GstQueueSize prev_level = queue->cur_level; /* STREAM_START and SEGMENT reset the EOS status of a * pad. Change the cached sinkpad flow result accordingly */ @@ -1070,7 +1110,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) } gst_queue_locked_enqueue_event (queue, event); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS (queue, prev_level); } else { /* non-serialized events are forwarded downstream immediately */ ret = gst_pad_push_event (queue->srcpad, event); @@ -1237,6 +1277,7 @@ gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent, /* we have to lock the queue since we span threads */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + GstQueueSize prev_level = queue->cur_level; /* when we received EOS, we refuse any more data */ if (queue->eos) goto out_eos; @@ -1284,8 +1325,15 @@ gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent, /* now we can clean up and exit right away */ goto out_unref; case GST_QUEUE_LEAK_DOWNSTREAM: + { gst_queue_leak_downstream (queue); + + if (!queue->silent) { + GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS (queue, prev_level); + GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + } break; + } default: g_warning ("Unknown leaky type, using default"); /* fall-through */ @@ -1340,14 +1388,14 @@ gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent, gst_queue_locked_enqueue_buffer_list (queue, obj); else gst_queue_locked_enqueue_buffer (queue, obj); - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS (queue, prev_level); return GST_FLOW_OK; /* special conditions */ out_unref: { - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS (queue, prev_level); gst_mini_object_unref (obj); @@ -1587,12 +1635,14 @@ gst_queue_loop (GstPad * pad) } } + GstQueueSize prev_level = queue->cur_level; + ret = gst_queue_push_one (queue); queue->srcresult = ret; if (ret != GST_FLOW_OK) goto out_flushing; - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_QUEUE_MUTEX_UNLOCK_NOTIFY_LEVELS (queue, prev_level); return; @@ -1617,6 +1667,7 @@ out_flushing: g_cond_signal (&queue->query_handled); } GST_QUEUE_MUTEX_UNLOCK (queue); + /* let app know about us giving up if upstream is not expected to do so */ /* EOS is already taken care of elsewhere */ if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) { @@ -1897,6 +1948,9 @@ gst_queue_set_property (GObject * object, case PROP_FLUSH_ON_EOS: queue->flush_on_eos = g_value_get_boolean (value); break; + case PROP_NOTIFY_LEVELS: + queue->notify_levels = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1950,6 +2004,9 @@ gst_queue_get_property (GObject * object, case PROP_FLUSH_ON_EOS: g_value_set_boolean (value, queue->flush_on_eos); break; + case PROP_NOTIFY_LEVELS: + g_value_set_boolean (value, queue->notify_levels); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; diff --git a/subprojects/gstreamer/plugins/elements/gstqueue.h b/subprojects/gstreamer/plugins/elements/gstqueue.h index d1a434905d..12d8c1bd1b 100644 --- a/subprojects/gstreamer/plugins/elements/gstqueue.h +++ b/subprojects/gstreamer/plugins/elements/gstqueue.h @@ -129,7 +129,8 @@ struct _GstQueue { gboolean head_needs_discont, tail_needs_discont; gboolean push_newsegment; - gboolean silent; /* don't emit signals */ + gboolean silent; /* don't emit signals */ + gboolean notify_levels; /* emit 'notify' signals on level changes */ /* whether the first new segment has been applied to src */ gboolean newseg_applied_to_src;