aggregator: expose sample selection API

See https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/771
for context.

This exposes new API that subclasses must call from their
aggregate() implementation to signal that they have selected
the next samples they will aggregate: gst_aggregator_selected_samples()

GstAggregator will emit a new signal there, `samples-selected`,
handlers can then look up samples per pad with the newly-added
gst_aggregator_peek_next_sample.

In addition, a new FIXME is logged when subclasses haven't actually
called `selected_samples` from their aggregate() implementation.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/549>
This commit is contained in:
Mathieu Duponchelle 2020-06-30 21:10:05 +02:00 committed by Sebastian Dröge
parent 411d255154
commit d74efc1aed
2 changed files with 115 additions and 2 deletions

View file

@ -297,6 +297,31 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
return TRUE; return TRUE;
} }
/**
* gst_aggregator_peek_next_sample:
*
* Use this function to determine what input buffers will be aggregated
* to produce the next output buffer. This should only be called from
* a #GstAggregator::samples-selected handler, and can be used to precisely
* control aggregating parameters for a given set of input samples.
*
* Returns: The sample that is about to be aggregated. It may hold a #GstBuffer
* or a #GstBufferList. The contents of its info structure is subclass-dependent,
* and documented on a subclass basis. The buffers held by the sample are
* not writable.
* Since: 1.18
*/
GstSample *
gst_aggregator_peek_next_sample (GstAggregator * agg, GstAggregatorPad * aggpad)
{
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (agg);
if (klass->peek_next_sample)
return (klass->peek_next_sample (agg, aggpad));
return NULL;
}
/************************************* /*************************************
* GstAggregator implementation * * GstAggregator implementation *
*************************************/ *************************************/
@ -339,6 +364,7 @@ struct _GstAggregatorPrivate
/* aggregate */ /* aggregate */
GstClockID aggregate_id; /* protected by src_lock */ GstClockID aggregate_id; /* protected by src_lock */
gboolean selected_samples_called_or_warned; /* protected by src_lock */
GMutex src_lock; GMutex src_lock;
GCond src_cond; GCond src_cond;
@ -354,6 +380,7 @@ struct _GstAggregatorPrivate
/* properties */ /* properties */
gint64 latency; /* protected by both src_lock and all pad locks */ gint64 latency; /* protected by both src_lock and all pad locks */
gboolean emit_signals;
}; };
/* Seek event forwarding helper */ /* Seek event forwarding helper */
@ -373,6 +400,7 @@ typedef struct
#define DEFAULT_MIN_UPSTREAM_LATENCY 0 #define DEFAULT_MIN_UPSTREAM_LATENCY 0
#define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
#define DEFAULT_START_TIME (-1) #define DEFAULT_START_TIME (-1)
#define DEFAULT_EMIT_SIGNALS FALSE
enum enum
{ {
@ -381,9 +409,18 @@ enum
PROP_MIN_UPSTREAM_LATENCY, PROP_MIN_UPSTREAM_LATENCY,
PROP_START_TIME_SELECTION, PROP_START_TIME_SELECTION,
PROP_START_TIME, PROP_START_TIME,
PROP_EMIT_SIGNALS,
PROP_LAST PROP_LAST
}; };
enum
{
SIGNAL_SAMPLES_SELECTED,
LAST_SIGNAL,
};
static guint gst_aggregator_signals[LAST_SIGNAL] = { 0 };
static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
@ -1282,6 +1319,13 @@ gst_aggregator_aggregate_func (GstAggregator * self)
flow_return = klass->aggregate (self, timeout); flow_return = klass->aggregate (self, timeout);
} }
if (!priv->selected_samples_called_or_warned) {
GST_FIXME_OBJECT (self,
"Subclass should call gst_aggregator_selected_samples() from its "
"aggregate implementation.");
priv->selected_samples_called_or_warned = TRUE;
}
if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA) if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
continue; continue;
@ -2551,6 +2595,9 @@ gst_aggregator_set_property (GObject * object, guint prop_id,
case PROP_START_TIME: case PROP_START_TIME:
agg->priv->start_time = g_value_get_uint64 (value); agg->priv->start_time = g_value_get_uint64 (value);
break; break;
case PROP_EMIT_SIGNALS:
agg->priv->emit_signals = g_value_get_boolean (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -2578,6 +2625,9 @@ gst_aggregator_get_property (GObject * object, guint prop_id,
case PROP_START_TIME: case PROP_START_TIME:
g_value_set_uint64 (value, agg->priv->start_time); g_value_set_uint64 (value, agg->priv->start_time);
break; break;
case PROP_EMIT_SIGNALS:
g_value_set_boolean (value, agg->priv->emit_signals);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -2670,6 +2720,31 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
"Start time to use if start-time-selection=set", 0, "Start time to use if start-time-selection=set", 0,
G_MAXUINT64, G_MAXUINT64,
DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAggregator:emit-signals:
*
* Enables the emission of signals such as #GstAggregator::samples-selected
*
* Since: 1.18
*/
g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
g_param_spec_boolean ("emit-signals", "Emit signals",
"Send signals", DEFAULT_EMIT_SIGNALS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAggregator::samples-selected:
*
* Signals that the #GstAggregator subclass has selected the next set
* of input samples it will aggregate. Handlers may call
* gst_aggregator_peek_next_sample() at that point.
*
* Since: 1.18
*/
gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED] =
g_signal_new ("samples-selected", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_FIRST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
} }
static inline gpointer static inline gpointer
@ -3153,7 +3228,6 @@ static void
gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer) gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer)
{ {
pad->priv->num_buffers--; pad->priv->num_buffers--;
GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer);
if (buffer && pad->priv->emit_signals) { if (buffer && pad->priv->emit_signals) {
g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED], g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
0, buffer); 0, buffer);
@ -3526,3 +3600,26 @@ gst_aggregator_update_segment (GstAggregator * self, const GstSegment * segment)
self->priv->send_segment = TRUE; self->priv->send_segment = TRUE;
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
} }
/**
* gst_aggregator_selected_samples:
*
* Subclasses should call this when they have prepared the
* buffers they will aggregate for each of their sink pads, but
* before using any of the properties of the pads that govern
* *how* aggregation should be performed, for example z-index
* for video aggregators.
*
* Since: 1.18
*/
void
gst_aggregator_selected_samples (GstAggregator * self)
{
g_return_if_fail (GST_IS_AGGREGATOR (self));
if (self->priv->emit_signals) {
g_signal_emit (self, gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED], 0);
}
self->priv->selected_samples_called_or_warned = TRUE;
}

View file

@ -340,9 +340,18 @@ struct _GstAggregatorClass {
*/ */
GstFlowReturn (*finish_buffer_list) (GstAggregator * aggregator, GstFlowReturn (*finish_buffer_list) (GstAggregator * aggregator,
GstBufferList * bufferlist); GstBufferList * bufferlist);
/**
* GstAggregatorClass::peek_next_sample:
*
* See gst_aggregator_peek_next_sample().
*
* Since: 1.18
*/
GstSample * (*peek_next_sample) (GstAggregator *aggregator,
GstAggregatorPad * aggregator_pad);
/*< private >*/ /*< private >*/
gpointer _gst_reserved[GST_PADDING_LARGE-4]; gpointer _gst_reserved[GST_PADDING_LARGE-5];
}; };
/************************************ /************************************
@ -404,6 +413,13 @@ GST_BASE_API
void gst_aggregator_update_segment (GstAggregator * self, void gst_aggregator_update_segment (GstAggregator * self,
const GstSegment * segment); const GstSegment * segment);
GST_BASE_API
GstSample * gst_aggregator_peek_next_sample (GstAggregator *self,
GstAggregatorPad * pad);
GST_BASE_API
void gst_aggregator_selected_samples (GstAggregator * self);
/** /**
* GstAggregatorStartTimeSelection: * GstAggregatorStartTimeSelection:
* @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0. * @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0.