aggregator: expose API for ignoring inactive pads

An inactive pad is a pad which, in live mode, hasn't yet received
a first buffer, but has been waited on at least once.

Exposing API to support this behaviour allows users of aggregator
subclasses to request pads, and not start pushing data on those
immediately, while avoiding systematic timeouts.

Subclasses must check in explicitly to this behavior, most likely
by exposing a user-facing property, and must check whether a pad
needs ignoring when aggregating. That is because by design,
aggregator subclasses don't get a list of "ready" pads, but instead
directly iterate element->sinkpads.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/867>
This commit is contained in:
Mathieu Duponchelle 2021-10-18 15:56:31 +02:00 committed by GStreamer Marge Bot
parent 3809b9dca4
commit f829ed3313
2 changed files with 116 additions and 1 deletions

View file

@ -252,7 +252,10 @@ struct _GstAggregatorPadPrivate
guint32 last_flush_start_seqnum; guint32 last_flush_start_seqnum;
guint32 last_flush_stop_seqnum; guint32 last_flush_stop_seqnum;
/* Whether the pad hasn't received a first buffer yet */
gboolean first_buffer; gboolean first_buffer;
/* Whether we waited once for the pad's first buffer */
gboolean waited_once;
GQueue data; /* buffers, events and queries */ GQueue data; /* buffers, events and queries */
GstBuffer *clipped_buffer; GstBuffer *clipped_buffer;
@ -299,6 +302,7 @@ gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
aggpad->priv->tail_time = GST_CLOCK_TIME_NONE; aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
aggpad->priv->time_level = 0; aggpad->priv->time_level = 0;
aggpad->priv->first_buffer = TRUE; aggpad->priv->first_buffer = TRUE;
aggpad->priv->waited_once = FALSE;
} }
static gboolean static gboolean
@ -400,6 +404,7 @@ struct _GstAggregatorPrivate
/* properties */ /* properties */
gint64 latency; /* protected by both src_lock and all pad locks */ gint64 latency; /* protected by both src_lock and all pad locks */
gboolean emit_signals; gboolean emit_signals;
gboolean ignore_inactive_pads;
}; };
/* Seek event forwarding helper */ /* Seek event forwarding helper */
@ -464,6 +469,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
GList *l, *sinkpads; GList *l, *sinkpads;
gboolean have_buffer = TRUE; gboolean have_buffer = TRUE;
gboolean have_event_or_query = FALSE; gboolean have_event_or_query = FALSE;
guint n_ready = 0;
GST_LOG_OBJECT (self, "checking pads"); GST_LOG_OBJECT (self, "checking pads");
@ -490,6 +496,12 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
break; break;
} }
if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live &&
pad->priv->waited_once && pad->priv->first_buffer && !pad->priv->eos) {
PAD_UNLOCK (pad);
continue;
}
/* Otherwise check if we have a clipped buffer or a buffer at the top of /* Otherwise check if we have a clipped buffer or a buffer at the top of
* the queue, and if not then this pad is not ready unless it is also EOS */ * the queue, and if not then this pad is not ready unless it is also EOS */
if (!pad->priv->clipped_buffer if (!pad->priv->clipped_buffer
@ -502,17 +514,24 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
* There's no point in waiting for buffers on EOS pads */ * There's no point in waiting for buffers on EOS pads */
if (!pad->priv->eos) if (!pad->priv->eos)
have_buffer = FALSE; have_buffer = FALSE;
else
n_ready++;
} else if (self->priv->peer_latency_live) { } else if (self->priv->peer_latency_live) {
/* In live mode, having a single pad with buffers is enough to /* In live mode, having a single pad with buffers is enough to
* generate a start time from it. In non-live mode all pads need * generate a start time from it. In non-live mode all pads need
* to have a buffer * to have a buffer
*/ */
self->priv->first_buffer = FALSE; self->priv->first_buffer = FALSE;
n_ready++;
} }
PAD_UNLOCK (pad); PAD_UNLOCK (pad);
} }
if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live
&& n_ready == 0)
goto no_sinkpads;
if (have_event_or_query) if (have_event_or_query)
goto pad_not_ready_but_event_or_query; goto pad_not_ready_but_event_or_query;
@ -863,6 +882,18 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
/* we timed out */ /* we timed out */
if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) { if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
GList *l;
GST_OBJECT_LOCK (self);
for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
GstAggregatorPad *pad = GST_AGGREGATOR_PAD (l->data);
PAD_LOCK (pad);
pad->priv->waited_once = TRUE;
PAD_UNLOCK (pad);
}
GST_OBJECT_UNLOCK (self);
SRC_UNLOCK (self); SRC_UNLOCK (self);
*timeout = TRUE; *timeout = TRUE;
return TRUE; return TRUE;
@ -1352,7 +1383,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
} }
if (timeout || flow_return >= GST_FLOW_OK) { if (timeout || flow_return >= GST_FLOW_OK) {
GST_TRACE_OBJECT (self, "Actually aggregating!"); GST_LOG_OBJECT (self, "Actually aggregating, timeout: %d", timeout);
flow_return = klass->aggregate (self, timeout); flow_return = klass->aggregate (self, timeout);
} }
@ -2846,6 +2877,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
priv->max_padserial = -1; priv->max_padserial = -1;
priv->tags_changed = FALSE; priv->tags_changed = FALSE;
priv->ignore_inactive_pads = FALSE;
self->priv->peer_latency_live = FALSE; self->priv->peer_latency_live = FALSE;
self->priv->peer_latency_min = self->priv->sub_latency_min = 0; self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
@ -3533,6 +3565,36 @@ gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
return is_eos; return is_eos;
} }
/**
* gst_aggregator_pad_is_inactive:
* @pad: an aggregator pad
*
* It is only valid to call this method from #GstAggregatorClass::aggregate()
*
* Returns: %TRUE if the pad is inactive, %FALSE otherwise.
* See gst_aggregator_ignore_inactive_pads() for more info.
* Since: 1.20
*/
gboolean
gst_aggregator_pad_is_inactive (GstAggregatorPad * pad)
{
GstAggregator *self;
gboolean inactive;
self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
g_assert_nonnull (self);
PAD_LOCK (pad);
inactive = self->priv->ignore_inactive_pads && self->priv->peer_latency_live
&& pad->priv->first_buffer;
PAD_UNLOCK (pad);
gst_object_unref (self);
return inactive;
}
#if 0 #if 0
/* /*
* gst_aggregator_merge_tags: * gst_aggregator_merge_tags:
@ -3762,3 +3824,46 @@ gst_aggregator_selected_samples (GstAggregator * self,
self->priv->selected_samples_called_or_warned = TRUE; self->priv->selected_samples_called_or_warned = TRUE;
} }
/**
* gst_aggregator_set_ignore_inactive_pads:
* @ignore: whether inactive pads should not be waited on
*
* Subclasses should call this when they don't want to time out
* waiting for a pad that hasn't yet received any buffers in live
* mode.
*
* #GstAggregator will still wait once on each newly-added pad, making
* sure upstream has had a fair chance to start up.
*
* Since: 1.20
*/
void
gst_aggregator_set_ignore_inactive_pads (GstAggregator * self, gboolean ignore)
{
g_return_if_fail (GST_IS_AGGREGATOR (self));
GST_OBJECT_LOCK (self);
self->priv->ignore_inactive_pads = ignore;
GST_OBJECT_UNLOCK (self);
}
/**
* gst_aggregator_get_ignore_inactive_pads:
*
* Returns: whether inactive pads will not be waited on
* Since: 1.20
*/
gboolean
gst_aggregator_get_ignore_inactive_pads (GstAggregator * self)
{
gboolean ret;
g_return_val_if_fail (GST_IS_AGGREGATOR (self), FALSE);
GST_OBJECT_LOCK (self);
ret = self->priv->ignore_inactive_pads;
GST_OBJECT_UNLOCK (self);
return ret;
}

View file

@ -121,6 +121,9 @@ gboolean gst_aggregator_pad_has_buffer (GstAggregatorPad * pad);
GST_BASE_API GST_BASE_API
gboolean gst_aggregator_pad_is_eos (GstAggregatorPad * pad); gboolean gst_aggregator_pad_is_eos (GstAggregatorPad * pad);
GST_BASE_API
gboolean gst_aggregator_pad_is_inactive (GstAggregatorPad * pad);
/********************* /*********************
* GstAggregator API * * GstAggregator API *
********************/ ********************/
@ -424,6 +427,13 @@ void gst_aggregator_selected_samples (GstAggregator
GstClockTime duration, GstClockTime duration,
GstStructure * info); GstStructure * info);
GST_BASE_API
void gst_aggregator_set_ignore_inactive_pads (GstAggregator * self,
gboolean ignore);
GST_BASE_API
gboolean gst_aggregator_get_ignore_inactive_pads (GstAggregator * self);
/** /**
* GstAggregatorStartTimeSelection: * GstAggregatorStartTimeSelection:
* @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0. * @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0.