aggregator: Implement force_live API

Setting force_live lets aggregator behave as if it had at least one of
its sinks connected to a live source, which should let us get rid of the
fake live test source hack that is probably present in dozens of
applications by now.

+ Expose API for subclasses to set and get force_live
+ Expose force-live properties in GstVideoAggregator and GstAudioAggregator
+ Adds a simple test

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3008>
This commit is contained in:
Mathieu Duponchelle 2022-11-18 14:24:30 +01:00 committed by GStreamer Marge Bot
parent f29c19be58
commit b5cd758230
5 changed files with 193 additions and 17 deletions

View file

@ -537,6 +537,7 @@ static GstSample *gst_audio_aggregator_peek_next_sample (GstAggregator * agg,
#define DEFAULT_DISCONT_WAIT (1 * GST_SECOND)
#define DEFAULT_OUTPUT_BUFFER_DURATION_N (1)
#define DEFAULT_OUTPUT_BUFFER_DURATION_D (100)
#define DEFAULT_FORCE_LIVE FALSE
enum
{
@ -546,6 +547,7 @@ enum
PROP_DISCONT_WAIT,
PROP_OUTPUT_BUFFER_DURATION_FRACTION,
PROP_IGNORE_INACTIVE_PADS,
PROP_FORCE_LIVE,
};
G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GstAudioAggregator, gst_audio_aggregator,
@ -728,6 +730,23 @@ gst_audio_aggregator_class_init (GstAudioAggregatorClass * klass)
"Ignore inactive pads",
"Avoid timing out waiting for inactive pads", FALSE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAudioAggregator:force-live:
*
* Causes the element to aggregate on a timeout even when no live source is
* connected to its sinks. See #GstAggregator:min-upstream-latency for a
* companion property: in the vast majority of cases where you plan to plug in
* live sources with a non-zero latency, you should set it to a non-zero value.
*
* Since: 1.22
*/
g_object_class_install_property (gobject_class, PROP_FORCE_LIVE,
g_param_spec_boolean ("force-live", "Force live",
"Always operate in live mode and aggregate on timeout regardless of "
"whether any live sources are linked upstream",
DEFAULT_FORCE_LIVE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT_ONLY));
}
static void
@ -797,6 +816,10 @@ gst_audio_aggregator_set_property (GObject * object, guint prop_id,
gst_aggregator_set_ignore_inactive_pads (GST_AGGREGATOR (object),
g_value_get_boolean (value));
break;
case PROP_FORCE_LIVE:
gst_aggregator_set_force_live (GST_AGGREGATOR (object),
g_value_get_boolean (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -829,6 +852,10 @@ gst_audio_aggregator_get_property (GObject * object, guint prop_id,
g_value_set_boolean (value,
gst_aggregator_get_ignore_inactive_pads (GST_AGGREGATOR (object)));
break;
case PROP_FORCE_LIVE:
g_value_set_boolean (value,
gst_aggregator_get_force_live (GST_AGGREGATOR (object)));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -2195,7 +2222,7 @@ gst_audio_aggregator_aggregate (GstAggregator * agg, gboolean timeout)
gint64 next_timestamp;
gint rate, bpf;
gboolean dropped = FALSE;
gboolean is_eos = TRUE;
gboolean is_eos = !gst_aggregator_get_force_live (agg);
gboolean is_done = TRUE;
guint blocksize;
GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);

View file

@ -955,7 +955,13 @@ static void
g_thread_self()); \
} G_STMT_END
enum
{
PROP_0,
PROP_FORCE_LIVE,
};
#define DEFAULT_FORCE_LIVE FALSE
/* Can't use the G_DEFINE_TYPE macros because we need the
* videoaggregator class in the _init to be able to set
@ -1717,7 +1723,7 @@ gst_video_aggregator_fill_queues (GstVideoAggregator * vagg,
GstClockTime output_end_running_time, gboolean timeout)
{
GList *l;
gboolean eos = TRUE;
gboolean eos = !gst_aggregator_get_force_live (GST_AGGREGATOR (vagg));
gboolean repeat_pad_eos = FALSE;
gboolean has_no_repeat_pads = FALSE;
gboolean need_more_data = FALSE;
@ -3002,6 +3008,10 @@ gst_video_aggregator_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
switch (prop_id) {
case PROP_FORCE_LIVE:
g_value_set_boolean (value,
gst_aggregator_get_force_live (GST_AGGREGATOR (object)));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -3013,6 +3023,10 @@ gst_video_aggregator_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
switch (prop_id) {
case PROP_FORCE_LIVE:
gst_aggregator_set_force_live (GST_AGGREGATOR (object),
g_value_get_boolean (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -3070,6 +3084,23 @@ gst_video_aggregator_class_init (GstVideoAggregatorClass * klass)
/* Register the pad class */
g_type_class_ref (GST_TYPE_VIDEO_AGGREGATOR_PAD);
/**
* GstVideoAggregator:force-live:
*
* Causes the element to aggregate on a timeout even when no live source is
* connected to its sinks. See #GstAggregator:min-upstream-latency for a
* companion property: in the vast majority of cases where you plan to plug in
* live sources with a non-zero latency, you should set it to a non-zero value.
*
* Since: 1.22
*/
g_object_class_install_property (gobject_class, PROP_FORCE_LIVE,
g_param_spec_boolean ("force-live", "Force live",
"Always operate in live mode and aggregate on timeout regardless of "
"whether any live sources are linked upstream",
DEFAULT_FORCE_LIVE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT_ONLY));
}
static void

View file

@ -371,6 +371,7 @@ struct _GstAggregatorPrivate
gboolean send_segment;
gboolean flushing;
gboolean send_eos; /* protected by srcpad stream lock */
gboolean got_eos_event; /* protected by srcpad stream lock */
GstCaps *srccaps; /* protected by the srcpad stream lock */
@ -409,8 +410,16 @@ struct _GstAggregatorPrivate
gint64 latency; /* protected by both src_lock and all pad locks */
gboolean emit_signals;
gboolean ignore_inactive_pads;
gboolean force_live; /* Construct only, doesn't need any locking */
};
/* With SRC_LOCK */
static gboolean
is_live_unlocked (GstAggregator * self)
{
return self->priv->peer_latency_live || self->priv->force_live;
}
/* Seek event forwarding helper */
typedef struct
{
@ -429,6 +438,7 @@ typedef struct
#define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
#define DEFAULT_START_TIME (-1)
#define DEFAULT_EMIT_SIGNALS FALSE
#define DEFAULT_FORCE_LIVE FALSE
enum
{
@ -501,7 +511,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
break;
}
if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live &&
if (self->priv->ignore_inactive_pads && is_live_unlocked (self) &&
pad->priv->waited_once && pad->priv->first_buffer && !pad->priv->eos) {
PAD_UNLOCK (pad);
GST_LOG_OBJECT (pad, "Ignoring inactive pad");
@ -528,7 +538,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
} else {
GST_TRACE_OBJECT (pad, "Have %" GST_TIME_FORMAT " queued in %u buffers",
GST_TIME_ARGS (pad->priv->time_level), pad->priv->num_buffers);
if (self->priv->peer_latency_live) {
if (is_live_unlocked (self)) {
/* In live mode, having a single pad with buffers is enough to
* generate a start time from it. In non-live mode all pads need
* to have a buffer
@ -541,7 +551,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
PAD_UNLOCK (pad);
}
if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live
if (self->priv->ignore_inactive_pads && is_live_unlocked (self)
&& n_ready == 0)
goto no_sinkpads;
@ -1388,10 +1398,15 @@ gst_aggregator_aggregate_func (GstAggregator * self)
if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
goto handle_error;
if (self->priv->peer_latency_live)
if (is_live_unlocked (self))
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
gst_aggregator_pad_skip_buffers, NULL);
if (self->priv->got_eos_event) {
gst_aggregator_push_eos (self);
continue;
}
/* Ensure we have buffers ready (either in clipped_buffer or at the head of
* the queue */
if (!gst_aggregator_wait_and_check (self, &timeout)) {
@ -1478,6 +1493,7 @@ gst_aggregator_start (GstAggregator * self)
self->priv->send_stream_start = TRUE;
self->priv->send_segment = TRUE;
self->priv->send_eos = TRUE;
self->priv->got_eos_event = FALSE;
self->priv->srccaps = NULL;
self->priv->has_peer_latency = FALSE;
@ -1699,6 +1715,7 @@ gst_aggregator_default_sink_event (GstAggregator * self,
event = NULL;
SRC_LOCK (self);
priv->send_eos = TRUE;
priv->got_eos_event = FALSE;
SRC_BROADCAST (self);
SRC_UNLOCK (self);
@ -1977,6 +1994,14 @@ gst_aggregator_change_state (GstElement * element, GstStateChange transition)
SRC_LOCK (self);
SRC_BROADCAST (self);
SRC_UNLOCK (self);
if (self->priv->force_live) {
ret = GST_STATE_CHANGE_NO_PREROLL;
}
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
if (self->priv->force_live) {
ret = GST_STATE_CHANGE_NO_PREROLL;
}
break;
default:
break;
@ -2195,12 +2220,18 @@ gst_aggregator_get_latency_unlocked (GstAggregator * self)
ret = gst_aggregator_query_latency_unlocked (self, query);
gst_query_unref (query);
if (!ret)
/* If we've been set to live, we don't wait for a peer latency, we will
* simply query it again next time around */
if (!ret && !self->priv->force_live)
return GST_CLOCK_TIME_NONE;
}
if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
return GST_CLOCK_TIME_NONE;
/* If we've been set to live, we don't wait for a peer latency, we will
* simply query it again next time around */
if (!self->priv->force_live) {
if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
return GST_CLOCK_TIME_NONE;
}
/* latency_min is never GST_CLOCK_TIME_NONE by construction */
latency = self->priv->peer_latency_min;
@ -2262,8 +2293,17 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event)
GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
}
GST_STATE_UNLOCK (element);
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
SRC_LOCK (self);
self->priv->got_eos_event = TRUE;
SRC_BROADCAST (self);
SRC_UNLOCK (self);
}
return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
event);
}
@ -2627,6 +2667,16 @@ flushing:
return FALSE;
}
static void
gst_aggregator_constructed (GObject * object)
{
GstAggregator *agg = GST_AGGREGATOR (object);
if (agg->priv->force_live) {
GST_OBJECT_FLAG_SET (agg, GST_ELEMENT_FLAG_SOURCE);
}
}
static void
gst_aggregator_finalize (GObject * object)
{
@ -2818,6 +2868,7 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
gobject_class->set_property = gst_aggregator_set_property;
gobject_class->get_property = gst_aggregator_get_property;
gobject_class->constructed = gst_aggregator_constructed;
gobject_class->finalize = gst_aggregator_finalize;
g_object_class_install_property (gobject_class, PROP_LATENCY,
@ -2955,6 +3006,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
self->priv->latency = DEFAULT_LATENCY;
self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
self->priv->start_time = DEFAULT_START_TIME;
self->priv->force_live = DEFAULT_FORCE_LIVE;
g_mutex_init (&self->priv->src_lock);
g_cond_init (&self->priv->src_cond);
@ -3007,7 +3059,7 @@ gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
/* We also want at least two buffers, one is being processed and one is ready
* for the next iteration when we operate in live mode. */
if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
if (is_live_unlocked (self) && aggpad->priv->num_buffers < 2)
return TRUE;
/* On top of our latency, we also want to allow buffering up to the
@ -3648,7 +3700,7 @@ gst_aggregator_pad_is_inactive (GstAggregatorPad * pad)
g_assert_nonnull (self);
PAD_LOCK (pad);
inactive = self->priv->ignore_inactive_pads && self->priv->peer_latency_live
inactive = self->priv->ignore_inactive_pads && is_live_unlocked (self)
&& pad->priv->first_buffer;
PAD_UNLOCK (pad);
@ -3933,3 +3985,33 @@ gst_aggregator_get_ignore_inactive_pads (GstAggregator * self)
return ret;
}
/**
* gst_aggregator_get_force_live:
*
* Subclasses may use the return value to inform whether they should return
* %GST_FLOW_EOS from their aggregate implementation.
*
* Returns: whether live status was forced on @self.
*
* Since: 1.22
*/
gboolean
gst_aggregator_get_force_live (GstAggregator * self)
{
return self->priv->force_live;
}
/**
* gst_aggregator_set_force_live:
*
* Subclasses should call this at construction time in order for @self to
* aggregate on a timeout even when no live source is connected.
*
* Since: 1.22
*/
void
gst_aggregator_set_force_live (GstAggregator * self, gboolean force_live)
{
self->priv->force_live = force_live;
}

View file

@ -434,6 +434,13 @@ void gst_aggregator_set_ignore_inactive_pads (GstAggregator * self,
GST_BASE_API
gboolean gst_aggregator_get_ignore_inactive_pads (GstAggregator * self);
GST_BASE_API
gboolean gst_aggregator_get_force_live (GstAggregator *self);
GST_BASE_API
void gst_aggregator_set_force_live (GstAggregator *self,
gboolean force_live);
/**
* GstAggregatorStartTimeSelection:
* @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0.

View file

@ -26,6 +26,7 @@
#include <stdlib.h>
#include <gst/check/gstcheck.h>
#include <gst/check/gstharness.h>
#include <gst/base/gstaggregator.h>
/* dummy aggregator based element */
@ -143,10 +144,12 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout)
}
gst_iterator_free (iter);
if (all_eos == TRUE) {
GST_INFO_OBJECT (testagg, "no data available, must be EOS");
gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
return GST_FLOW_EOS;
if (!gst_aggregator_get_force_live (aggregator)) {
if (all_eos == TRUE) {
GST_INFO_OBJECT (testagg, "no data available, must be EOS");
gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
return GST_FLOW_EOS;
}
}
buf = gst_buffer_new ();
@ -188,6 +191,8 @@ gst_test_aggregator_class_init (GstTestAggregatorClass * klass)
base_aggregator_class->aggregate =
GST_DEBUG_FUNCPTR (gst_test_aggregator_aggregate);
base_aggregator_class->get_next_time = gst_aggregator_simple_get_next_time;
}
static void
@ -846,8 +851,8 @@ GST_START_TEST (test_flushing_seek)
GST_BUFFER_TIMESTAMP (buf) = 0;
_chain_data_init (&data2, test.aggregator, buf, NULL);
gst_segment_init (&GST_AGGREGATOR_PAD (GST_AGGREGATOR (test.
aggregator)->srcpad)->segment, GST_FORMAT_TIME);
gst_segment_init (&GST_AGGREGATOR_PAD (GST_AGGREGATOR (test.aggregator)->
srcpad)->segment, GST_FORMAT_TIME);
/* now do a successful flushing seek */
event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
@ -1353,6 +1358,29 @@ GST_START_TEST (test_remove_pad_on_aggregate)
GST_END_TEST;
GST_START_TEST (test_force_live)
{
GstElement *agg;
GstHarness *h;
GstBuffer *buf;
agg = gst_check_setup_element ("testaggregator");
g_object_set (agg, "latency", GST_USECOND, NULL);
gst_aggregator_set_force_live (GST_AGGREGATOR (agg), TRUE);
h = gst_harness_new_with_element (agg, NULL, "src");
gst_harness_play (h);
gst_harness_crank_single_clock_wait (h);
buf = gst_harness_pull (h);
gst_buffer_unref (buf);
gst_harness_teardown (h);
gst_object_unref (agg);
}
GST_END_TEST;
static Suite *
gst_aggregator_suite (void)
{
@ -1382,6 +1410,7 @@ gst_aggregator_suite (void)
tcase_add_test (general, test_change_state_intensive);
tcase_add_test (general, test_flush_on_aggregate);
tcase_add_test (general, test_remove_pad_on_aggregate);
tcase_add_test (general, test_force_live);
return suite;
}