aggregator: Add a timeout parameter to ::aggregate()

When this is TRUE, we really have to produce output. This happens
in live mixing mode when we have to output something for the current
time, no matter if we have enough input or not.
This commit is contained in:
Sebastian Dröge 2014-12-17 17:54:09 +01:00
parent e82ec36863
commit d4c4af699e
6 changed files with 26 additions and 16 deletions

View file

@ -486,12 +486,14 @@ gst_aggregator_get_next_time (GstAggregator * self)
/* called with the src STREAM lock */
static gboolean
_wait_and_check (GstAggregator * self)
_wait_and_check (GstAggregator * self, gboolean * timeout)
{
GstClockTime latency_max, latency_min;
GstClockTime start;
gboolean live;
*timeout = FALSE;
gst_aggregator_get_latency (self, &live, &latency_min, &latency_max);
if (gst_aggregator_iterate_sinkpads (self,
@ -561,6 +563,7 @@ _wait_and_check (GstAggregator * self)
/* we timed out */
if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
SRC_STREAM_UNLOCK (self);
*timeout = TRUE;
return TRUE;
}
}
@ -575,6 +578,7 @@ aggregate_func (GstAggregator * self)
{
GstAggregatorPrivate *priv = self->priv;
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
gboolean timeout = FALSE;
if (self->priv->running == FALSE) {
GST_DEBUG_OBJECT (self, "Not running anymore");
@ -583,12 +587,12 @@ aggregate_func (GstAggregator * self)
GST_LOG_OBJECT (self, "Checking aggregate");
while (priv->send_eos && priv->running) {
if (!_wait_and_check (self))
if (!_wait_and_check (self, &timeout))
continue;
GST_TRACE_OBJECT (self, "Actually aggregating!");
priv->flow_return = klass->aggregate (self);
priv->flow_return = klass->aggregate (self, timeout);
if (priv->flow_return == GST_FLOW_EOS) {
_push_eos (self);

View file

@ -237,7 +237,8 @@ struct _GstAggregatorClass {
GstPadMode mode,
gboolean active);
GstFlowReturn (*aggregate) (GstAggregator * aggregator);
GstFlowReturn (*aggregate) (GstAggregator * aggregator,
gboolean timeout);
gboolean (*stop) (GstAggregator * aggregator);

View file

@ -1230,7 +1230,7 @@ gst_videoaggregator_get_next_time (GstAggregator * agg)
}
static GstFlowReturn
gst_videoaggregator_aggregate (GstAggregator * agg)
gst_videoaggregator_aggregate (GstAggregator * agg, gboolean timeout)
{
GstFlowReturn ret;
GstVideoAggregator *vagg = GST_VIDEO_AGGREGATOR (agg);
@ -1275,7 +1275,7 @@ gst_videoaggregator_aggregate (GstAggregator * agg)
output_end_time);
}
if (res == GST_FLOW_NEEDS_DATA) {
if (res == GST_FLOW_NEEDS_DATA && !timeout) {
GST_DEBUG_OBJECT (vagg, "Need more data for decisions");
ret = GST_FLOW_OK;
goto done;

View file

@ -233,7 +233,8 @@ static void gst_audiomixer_release_pad (GstElement * element, GstPad * pad);
static GstFlowReturn
gst_audiomixer_do_clip (GstAggregator * agg,
GstAggregatorPad * bpad, GstBuffer * buffer, GstBuffer ** outbuf);
static GstFlowReturn gst_audiomixer_aggregate (GstAggregator * agg);
static GstFlowReturn gst_audiomixer_aggregate (GstAggregator * agg,
gboolean timeout);
static GstClockTime
gst_audiomixer_get_next_time (GstAggregator * agg)
@ -1327,7 +1328,7 @@ gst_audio_mixer_mix_buffer (GstAudioMixer * audiomixer, GstAudioMixerPad * pad,
}
static GstFlowReturn
gst_audiomixer_aggregate (GstAggregator * agg)
gst_audiomixer_aggregate (GstAggregator * agg, gboolean timeout)
{
/* Get all pads that have data for us and store them in a
* new list.
@ -1401,7 +1402,6 @@ gst_audiomixer_aggregate (GstAggregator * agg)
} else {
next_offset = audiomixer->offset - audiomixer->blocksize;
}
next_timestamp = gst_util_uint64_scale (next_offset, GST_SECOND, rate);
if (audiomixer->current_buffer) {
@ -1428,13 +1428,14 @@ gst_audiomixer_aggregate (GstAggregator * agg)
GstAudioMixerPad *pad = GST_AUDIO_MIXER_PAD (iter->data);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (iter->data);
inbuf = gst_aggregator_pad_get_buffer (aggpad);
if (!inbuf)
continue;
g_assert (!pad->buffer || pad->buffer == inbuf);
/* New buffer? */
if (!pad->buffer || pad->buffer != inbuf) {
if (!pad->buffer) {
/* Takes ownership of buffer */
if (!gst_audio_mixer_fill_buffer (audiomixer, pad, inbuf)) {
dropped = TRUE;
@ -1451,11 +1452,13 @@ gst_audiomixer_aggregate (GstAggregator * agg)
}
/* At this point adata->output_offset >= audiomixer->offset or we have no buffer anymore */
g_assert (!pad->buffer || pad->output_offset >= audiomixer->offset);
if (pad->output_offset >= audiomixer->offset
&& pad->output_offset <
audiomixer->offset + audiomixer->blocksize && pad->buffer) {
GST_LOG_OBJECT (aggpad, "Mixing buffer for current offset");
gst_audio_mixer_mix_buffer (audiomixer, pad, &outmap);
if (pad->output_offset >= next_offset) {
GST_DEBUG_OBJECT (pad,
"Pad is after current offset: %" G_GUINT64_FORMAT " >= %"
@ -1469,17 +1472,17 @@ gst_audiomixer_aggregate (GstAggregator * agg)
gst_buffer_unmap (outbuf, &outmap);
if (dropped) {
if (dropped && !timeout) {
/* We dropped a buffer, retry */
GST_INFO_OBJECT (audiomixer,
"A pad dropped a buffer, wait for the next one");
return GST_FLOW_OK;
}
if (!is_done && !is_eos) {
if (!is_done && !is_eos && !timeout) {
/* Get more buffers */
GST_INFO_OBJECT (audiomixer,
"We're not done yet for the current offset," " waiting for more data");
"We're not done yet for the current offset, waiting for more data");
return GST_FLOW_OK;
}
@ -1489,7 +1492,6 @@ gst_audiomixer_aggregate (GstAggregator * agg)
GST_DEBUG_OBJECT (audiomixer, "We're EOS");
GST_OBJECT_LOCK (agg);
for (iter = GST_ELEMENT (agg)->sinkpads; iter; iter = iter->next) {
GstAudioMixerPad *pad = GST_AUDIO_MIXER_PAD (iter->data);

View file

@ -280,6 +280,9 @@ gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad,
static GstAllocationParams params = { 0, 15, 0, 0, };
gint width, height;
if (!pad->buffer)
return TRUE;
if (!gst_video_frame_map (frame, &pad->buffer_vinfo, pad->buffer,
GST_MAP_READ)) {
GST_WARNING_OBJECT (vagg, "Could not map input buffer");

View file

@ -63,7 +63,7 @@ struct _GstTestAggregatorClass
};
static GstFlowReturn
gst_test_aggregator_aggregate (GstAggregator * aggregator)
gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout)
{
GstIterator *iter;
gboolean all_eos = TRUE;