omx: Add timeout to the flush operation and move buffer populating to a separate function

This commit is contained in:
Sebastian Dröge 2013-03-07 11:11:58 +01:00
parent 001b7f0ed9
commit 7a1eaec3b9
5 changed files with 162 additions and 122 deletions

View file

@ -720,7 +720,8 @@ gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state)
comp->pending_state = state;
/* Reset some things */
if (old_state == OMX_StateExecuting && state < old_state) {
if ((old_state == OMX_StateExecuting || old_state == OMX_StatePause)
&& state < old_state) {
g_list_free (comp->pending_reconfigure_outports);
comp->pending_reconfigure_outports = NULL;
/* Notify all inports that are still waiting */
@ -1346,7 +1347,8 @@ done:
/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_ERRORTYPE
gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
gst_omx_port_set_flushing (GstOMXPort * port, GstClockTime timeout,
gboolean flush)
{
GstOMXComponent *comp;
OMX_ERRORTYPE err = OMX_ErrorNone;
@ -1374,18 +1376,9 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
goto done;
}
if (comp->state != OMX_StateIdle && comp->state != OMX_StateExecuting
&& comp->state != OMX_StatePause) {
GST_DEBUG_OBJECT (comp->parent, "Component is in wrong state: %d",
comp->state);
err = OMX_ErrorUndefined;
goto done;
}
port->flushing = flush;
if (flush) {
gint64 wait_until;
gint64 wait_until = -1;
gboolean signalled;
OMX_ERRORTYPE last_error;
@ -1415,8 +1408,23 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
goto done;
}
wait_until = g_get_monotonic_time () + 5 * G_TIME_SPAN_SECOND;
GST_DEBUG_OBJECT (comp->parent, "Waiting for 5s");
if (timeout != GST_CLOCK_TIME_NONE) {
gint64 add = timeout / (GST_SECOND / G_TIME_SPAN_SECOND);
if (add == 0) {
if (!port->flushed || (port->buffers
&& port->buffers->len >
g_queue_get_length (&port->pending_buffers)))
err = OMX_ErrorTimeout;
goto done;
}
wait_until = g_get_monotonic_time () + add;
GST_DEBUG_OBJECT (comp->parent, "Waiting for %" G_GINT64_FORMAT "us",
add);
} else {
GST_DEBUG_OBJECT (comp->parent, "Waiting for signal");
}
/* Retry until timeout or until an error happend or
* until all buffers were released by the component and
@ -1429,12 +1437,19 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
&& port->buffers->len > g_queue_get_length (&port->pending_buffers)) {
g_mutex_lock (&comp->messages_lock);
g_mutex_unlock (&comp->lock);
if (!g_queue_is_empty (&comp->messages))
if (!g_queue_is_empty (&comp->messages)) {
signalled = TRUE;
else
}
if (wait_until == -1) {
g_cond_wait (&comp->messages_cond, &comp->messages_lock);
signalled = TRUE;
} else {
signalled =
g_cond_wait_until (&comp->messages_cond, &comp->messages_lock,
wait_until);
}
g_mutex_unlock (&comp->messages_lock);
g_mutex_lock (&comp->lock);
@ -1458,36 +1473,6 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
err = OMX_ErrorTimeout;
goto done;
}
} else {
if (port->port_def.eDir == OMX_DirOutput && port->buffers) {
GstOMXBuffer *buf;
/* Enqueue all buffers for the component to fill */
while ((buf = g_queue_pop_head (&port->pending_buffers))) {
if (!buf)
continue;
g_assert (!buf->used);
/* Reset all flags, some implementations don't
* reset them themselves and the flags are not
* valid anymore after the buffer was consumed
*/
buf->omx_buf->nFlags = 0;
err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
"Failed to pass buffer %p (%p) to port %u: %s (0x%08x)", buf,
buf->omx_buf->pBuffer, port->index,
gst_omx_error_to_string (err), err);
goto done;
}
GST_DEBUG_OBJECT (comp->parent, "Passed buffer %p (%p) to component",
buf, buf->omx_buf->pBuffer);
}
}
}
done:
@ -1967,6 +1952,86 @@ gst_omx_port_set_enabled (GstOMXPort * port, gboolean enabled)
return err;
}
static OMX_ERRORTYPE
gst_omx_port_populate_unlocked (GstOMXPort * port)
{
GstOMXComponent *comp;
OMX_ERRORTYPE err = OMX_ErrorNone;
GstOMXBuffer *buf;
g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
comp = port->comp;
GST_DEBUG_OBJECT (comp->parent, "Populating port %d", port->index);
gst_omx_component_handle_messages (comp);
if (port->flushing) {
GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing", port->index);
err = OMX_ErrorIncorrectStateOperation;
goto done;
}
if ((err = comp->last_error) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component is in error state: %s (0x%08x)",
gst_omx_error_to_string (err), err);
goto done;
}
if (port->port_def.eDir == OMX_DirOutput && port->buffers && !port->tunneled) {
/* Enqueue all buffers for the component to fill */
while ((buf = g_queue_pop_head (&port->pending_buffers))) {
if (!buf)
continue;
g_assert (!buf->used);
/* Reset all flags, some implementations don't
* reset them themselves and the flags are not
* valid anymore after the buffer was consumed
*/
buf->omx_buf->nFlags = 0;
err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
"Failed to pass buffer %p (%p) to port %u: %s (0x%08x)", buf,
buf->omx_buf->pBuffer, port->index, gst_omx_error_to_string (err),
err);
goto done;
}
GST_DEBUG_OBJECT (comp->parent, "Passed buffer %p (%p) to component",
buf, buf->omx_buf->pBuffer);
}
}
done:
gst_omx_port_update_port_definition (port, NULL);
GST_DEBUG_OBJECT (comp->parent, "Populated port %u: %s (0x%08x)",
port->index, gst_omx_error_to_string (err), err);
gst_omx_component_handle_messages (comp);
return err;
}
/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_ERRORTYPE
gst_omx_port_populate (GstOMXPort * port)
{
OMX_ERRORTYPE err;
g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
g_mutex_lock (&port->comp->lock);
err = gst_omx_port_populate_unlocked (port);
g_mutex_unlock (&port->comp->lock);
return err;
}
/* NOTE: Must be called while holding comp->lock, uses comp->messages_lock */
static OMX_ERRORTYPE
gst_omx_port_wait_enabled_unlocked (GstOMXPort * port, GstClockTime timeout)
@ -2061,39 +2126,6 @@ gst_omx_port_wait_enabled_unlocked (GstOMXPort * port, GstClockTime timeout)
} else {
if (enabled)
port->flushing = FALSE;
/* If everything went fine and we have an output port we
* should provide all newly allocated buffers to the port
*/
if (enabled && port->port_def.eDir == OMX_DirOutput && !port->tunneled) {
GstOMXBuffer *buf;
/* Enqueue all buffers for the component to fill */
while ((buf = g_queue_pop_head (&port->pending_buffers))) {
if (!buf)
continue;
g_assert (!buf->used);
/* Reset all flags, some implementations don't
* reset them themselves and the flags are not
* valid anymore after the buffer was consumed
*/
buf->omx_buf->nFlags = 0;
err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
"Failed to pass buffer %p (%p) to port %u: %s (0x%08x)", buf,
buf->omx_buf->pBuffer, port->index, gst_omx_error_to_string (err),
err);
goto done;
}
GST_DEBUG_OBJECT (comp->parent, "Passed buffer %p (%p) to component",
buf, buf->omx_buf->pBuffer);
}
}
}
gst_omx_component_handle_messages (comp);

View file

@ -292,13 +292,14 @@ OMX_ERRORTYPE gst_omx_port_update_port_definition (GstOMXPort *port, OMX_PAR
GstOMXAcquireBufferReturn gst_omx_port_acquire_buffer (GstOMXPort *port, GstOMXBuffer **buf);
OMX_ERRORTYPE gst_omx_port_release_buffer (GstOMXPort *port, GstOMXBuffer *buf);
OMX_ERRORTYPE gst_omx_port_set_flushing (GstOMXPort *port, gboolean flush);
OMX_ERRORTYPE gst_omx_port_set_flushing (GstOMXPort *port, GstClockTime timeout, gboolean flush);
gboolean gst_omx_port_is_flushing (GstOMXPort *port);
OMX_ERRORTYPE gst_omx_port_allocate_buffers (GstOMXPort *port);
OMX_ERRORTYPE gst_omx_port_use_buffers (GstOMXPort *port, const GList *buffers);
OMX_ERRORTYPE gst_omx_port_use_eglimages (GstOMXPort *port, const GList *images);
OMX_ERRORTYPE gst_omx_port_deallocate_buffers (GstOMXPort *port);
OMX_ERRORTYPE gst_omx_port_populate (GstOMXPort *port);
OMX_ERRORTYPE gst_omx_port_wait_buffers_released (GstOMXPort * port, GstClockTime timeout);
OMX_ERRORTYPE gst_omx_port_mark_reconfigured (GstOMXPort * port);

View file

@ -221,10 +221,6 @@ gst_omx_audio_enc_change_state (GstElement * element, GstStateChange transition)
ret = GST_STATE_CHANGE_FAILURE;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
if (self->enc_in_port)
gst_omx_port_set_flushing (self->enc_in_port, FALSE);
if (self->enc_out_port)
gst_omx_port_set_flushing (self->enc_out_port, FALSE);
self->downstream_flow_ret = GST_FLOW_OK;
self->draining = FALSE;
@ -234,9 +230,9 @@ gst_omx_audio_enc_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (self->enc_in_port)
gst_omx_port_set_flushing (self->enc_in_port, TRUE);
gst_omx_port_set_flushing (self->enc_in_port, 5 * GST_SECOND, TRUE);
if (self->enc_out_port)
gst_omx_port_set_flushing (self->enc_out_port, TRUE);
gst_omx_port_set_flushing (self->enc_out_port, 5 * GST_SECOND, TRUE);
g_mutex_lock (&self->drain_lock);
self->draining = FALSE;
@ -362,6 +358,10 @@ gst_omx_audio_enc_loop (GstOMXAudioEnc * self)
if (err != OMX_ErrorNone)
goto reconfigure_error;
err = gst_omx_port_populate (port);
if (err != OMX_ErrorNone)
goto reconfigure_error;
err = gst_omx_port_mark_reconfigured (port);
if (err != OMX_ErrorNone)
goto reconfigure_error;
@ -577,8 +577,8 @@ gst_omx_audio_enc_stop (GstAudioEncoder * encoder)
GST_DEBUG_OBJECT (self, "Stopping encoder");
gst_omx_port_set_flushing (self->enc_in_port, TRUE);
gst_omx_port_set_flushing (self->enc_out_port, TRUE);
gst_omx_port_set_flushing (self->enc_in_port, 5 * GST_SECOND, TRUE);
gst_omx_port_set_flushing (self->enc_out_port, 5 * GST_SECOND, TRUE);
gst_pad_stop_task (GST_AUDIO_ENCODER_SRC_PAD (encoder));
@ -770,8 +770,13 @@ gst_omx_audio_enc_set_format (GstAudioEncoder * encoder, GstAudioInfo * info)
}
/* Unset flushing to allow ports to accept data again */
gst_omx_port_set_flushing (self->enc_in_port, FALSE);
gst_omx_port_set_flushing (self->enc_out_port, FALSE);
gst_omx_port_set_flushing (self->enc_in_port, 5 * GST_SECOND, FALSE);
gst_omx_port_set_flushing (self->enc_out_port, 5 * GST_SECOND, FALSE);
/* Populate outport with buffers if we any */
if (!needs_disable)
if (gst_omx_port_populate (self->enc_out_port) != OMX_ErrorNone)
return FALSE;
if (gst_omx_component_get_last_error (self->enc) != OMX_ErrorNone) {
GST_ERROR_OBJECT (self, "Component in error state: %s (0x%08x)",
@ -800,8 +805,8 @@ gst_omx_audio_enc_flush (GstAudioEncoder * encoder)
gst_omx_audio_enc_drain (self);
gst_omx_port_set_flushing (self->enc_in_port, TRUE);
gst_omx_port_set_flushing (self->enc_out_port, TRUE);
gst_omx_port_set_flushing (self->enc_in_port, 5 * GST_SECOND, TRUE);
gst_omx_port_set_flushing (self->enc_out_port, 5 * GST_SECOND, TRUE);
/* Wait until the srcpad loop is finished */
GST_AUDIO_ENCODER_STREAM_UNLOCK (self);
@ -809,8 +814,9 @@ gst_omx_audio_enc_flush (GstAudioEncoder * encoder)
GST_PAD_STREAM_UNLOCK (GST_AUDIO_ENCODER_SRC_PAD (self));
GST_AUDIO_ENCODER_STREAM_LOCK (self);
gst_omx_port_set_flushing (self->enc_in_port, FALSE);
gst_omx_port_set_flushing (self->enc_out_port, FALSE);
gst_omx_port_set_flushing (self->enc_in_port, 5 * GST_SECOND, FALSE);
gst_omx_port_set_flushing (self->enc_out_port, 5 * GST_SECOND, FALSE);
gst_omx_port_populate (self->enc_out_port);
/* Start the srcpad loop again */
self->last_upstream_ts = 0;

View file

@ -251,10 +251,6 @@ gst_omx_video_dec_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
if (self->dec_in_port)
gst_omx_port_set_flushing (self->dec_in_port, FALSE);
if (self->dec_out_port)
gst_omx_port_set_flushing (self->dec_out_port, FALSE);
self->downstream_flow_ret = GST_FLOW_OK;
self->draining = FALSE;
self->started = FALSE;
@ -263,9 +259,9 @@ gst_omx_video_dec_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (self->dec_in_port)
gst_omx_port_set_flushing (self->dec_in_port, TRUE);
gst_omx_port_set_flushing (self->dec_in_port, 5 * GST_SECOND, TRUE);
if (self->dec_out_port)
gst_omx_port_set_flushing (self->dec_out_port, TRUE);
gst_omx_port_set_flushing (self->dec_out_port, 5 * GST_SECOND, TRUE);
g_mutex_lock (&self->drain_lock);
self->draining = FALSE;
@ -879,8 +875,8 @@ gst_omx_video_dec_stop (GstVideoDecoder * decoder)
GST_DEBUG_OBJECT (self, "Stopping decoder");
gst_omx_port_set_flushing (self->dec_in_port, TRUE);
gst_omx_port_set_flushing (self->dec_out_port, TRUE);
gst_omx_port_set_flushing (self->dec_in_port, 5 * GST_SECOND, TRUE);
gst_omx_port_set_flushing (self->dec_out_port, 5 * GST_SECOND, TRUE);
gst_pad_stop_task (GST_VIDEO_DECODER_SRC_PAD (decoder));
@ -1238,8 +1234,8 @@ gst_omx_video_dec_set_format (GstVideoDecoder * decoder,
}
/* Unset flushing to allow ports to accept data again */
gst_omx_port_set_flushing (self->dec_in_port, FALSE);
gst_omx_port_set_flushing (self->dec_out_port, FALSE);
gst_omx_port_set_flushing (self->dec_in_port, 5 * GST_SECOND, FALSE);
gst_omx_port_set_flushing (self->dec_out_port, 5 * GST_SECOND, FALSE);
if (gst_omx_component_get_last_error (self->dec) != OMX_ErrorNone) {
GST_ERROR_OBJECT (self, "Component in error state: %s (0x%08x)",
@ -1269,8 +1265,8 @@ gst_omx_video_dec_reset (GstVideoDecoder * decoder, gboolean hard)
GST_DEBUG_OBJECT (self, "Resetting decoder");
gst_omx_port_set_flushing (self->dec_in_port, TRUE);
gst_omx_port_set_flushing (self->dec_out_port, TRUE);
gst_omx_port_set_flushing (self->dec_in_port, 5 * GST_SECOND, TRUE);
gst_omx_port_set_flushing (self->dec_out_port, 5 * GST_SECOND, TRUE);
/* Wait until the srcpad loop is finished,
* unlock GST_VIDEO_DECODER_STREAM_LOCK to prevent deadlocks
@ -1280,8 +1276,8 @@ gst_omx_video_dec_reset (GstVideoDecoder * decoder, gboolean hard)
GST_PAD_STREAM_UNLOCK (GST_VIDEO_DECODER_SRC_PAD (self));
GST_VIDEO_DECODER_STREAM_LOCK (self);
gst_omx_port_set_flushing (self->dec_in_port, FALSE);
gst_omx_port_set_flushing (self->dec_out_port, FALSE);
gst_omx_port_set_flushing (self->dec_in_port, 5 * GST_SECOND, FALSE);
gst_omx_port_set_flushing (self->dec_out_port, 5 * GST_SECOND, FALSE);
/* Start the srcpad loop again */
self->last_upstream_ts = 0;

View file

@ -492,10 +492,6 @@ gst_omx_video_enc_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
if (self->enc_in_port)
gst_omx_port_set_flushing (self->enc_in_port, FALSE);
if (self->enc_out_port)
gst_omx_port_set_flushing (self->enc_out_port, FALSE);
self->downstream_flow_ret = GST_FLOW_OK;
self->draining = FALSE;
@ -505,9 +501,9 @@ gst_omx_video_enc_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (self->enc_in_port)
gst_omx_port_set_flushing (self->enc_in_port, TRUE);
gst_omx_port_set_flushing (self->enc_in_port, 5 * GST_SECOND, TRUE);
if (self->enc_out_port)
gst_omx_port_set_flushing (self->enc_out_port, TRUE);
gst_omx_port_set_flushing (self->enc_out_port, 5 * GST_SECOND, TRUE);
g_mutex_lock (&self->drain_lock);
self->draining = FALSE;
@ -815,6 +811,10 @@ gst_omx_video_enc_loop (GstOMXVideoEnc * self)
if (err != OMX_ErrorNone)
goto reconfigure_error;
err = gst_omx_port_populate (port);
if (err != OMX_ErrorNone)
goto reconfigure_error;
err = gst_omx_port_mark_reconfigured (port);
if (err != OMX_ErrorNone)
goto reconfigure_error;
@ -969,8 +969,8 @@ gst_omx_video_enc_stop (GstVideoEncoder * encoder)
GST_DEBUG_OBJECT (self, "Stopping encoder");
gst_omx_port_set_flushing (self->enc_in_port, TRUE);
gst_omx_port_set_flushing (self->enc_out_port, TRUE);
gst_omx_port_set_flushing (self->enc_in_port, 5 * GST_SECOND, TRUE);
gst_omx_port_set_flushing (self->enc_out_port, 5 * GST_SECOND, TRUE);
gst_pad_stop_task (GST_VIDEO_ENCODER_SRC_PAD (encoder));
@ -1243,8 +1243,12 @@ gst_omx_video_enc_set_format (GstVideoEncoder * encoder,
}
/* Unset flushing to allow ports to accept data again */
gst_omx_port_set_flushing (self->enc_in_port, FALSE);
gst_omx_port_set_flushing (self->enc_out_port, FALSE);
gst_omx_port_set_flushing (self->enc_in_port, 5 * GST_SECOND, FALSE);
gst_omx_port_set_flushing (self->enc_out_port, 5 * GST_SECOND, FALSE);
if (!needs_disable)
if (gst_omx_port_populate (self->enc_out_port) != OMX_ErrorNone)
return FALSE;
if (gst_omx_component_get_last_error (self->enc) != OMX_ErrorNone) {
GST_ERROR_OBJECT (self, "Component in error state: %s (0x%08x)",
@ -1275,8 +1279,8 @@ gst_omx_video_enc_reset (GstVideoEncoder * encoder, gboolean hard)
GST_DEBUG_OBJECT (self, "Resetting encoder");
gst_omx_port_set_flushing (self->enc_in_port, TRUE);
gst_omx_port_set_flushing (self->enc_out_port, TRUE);
gst_omx_port_set_flushing (self->enc_in_port, 5 * GST_SECOND, TRUE);
gst_omx_port_set_flushing (self->enc_out_port, 5 * GST_SECOND, TRUE);
/* Wait until the srcpad loop is finished,
* unlock GST_VIDEO_ENCODER_STREAM_LOCK to prevent deadlocks
@ -1286,8 +1290,9 @@ gst_omx_video_enc_reset (GstVideoEncoder * encoder, gboolean hard)
GST_PAD_STREAM_UNLOCK (GST_VIDEO_ENCODER_SRC_PAD (self));
GST_VIDEO_ENCODER_STREAM_LOCK (self);
gst_omx_port_set_flushing (self->enc_in_port, FALSE);
gst_omx_port_set_flushing (self->enc_out_port, FALSE);
gst_omx_port_set_flushing (self->enc_in_port, 5 * GST_SECOND, FALSE);
gst_omx_port_set_flushing (self->enc_out_port, 5 * GST_SECOND, FALSE);
gst_omx_port_populate (self->enc_out_port);
/* Start the srcpad loop again */
self->last_upstream_ts = 0;