omx: Rework port reconfiguration

We always reconfigure all ports now if the settings of one
port changes to prevent lots of race conditions, dropped
frames and similar issues.
This commit is contained in:
Sebastian Dröge 2011-07-08 15:25:07 +02:00
parent 0fbff1000f
commit db08890edd
3 changed files with 240 additions and 122 deletions

View file

@ -233,48 +233,24 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent,
}
case OMX_EventPortSettingsChanged:
{
OMX_U32 index;
GstOMXPort *port = NULL;
gint i, n;
/* FIXME XXX: WTF? Bellagio passes
* the port index as *second* parameter
* instead of first...
*/
index = nData2;
g_mutex_lock (comp->state_lock);
GST_DEBUG_OBJECT (comp->parent, "Settings changed (cookie: %d -> %d)",
comp->settings_cookie, comp->settings_cookie + 1);
comp->settings_cookie++;
comp->reconfigure_out_pending = comp->n_out_ports;
g_mutex_unlock (comp->state_lock);
port = gst_omx_component_get_port (comp, index);
if (!port)
break;
/* Now wake up all ports */
n = comp->ports->len;
for (i = 0; i < n; i++) {
GstOMXPort *port = g_ptr_array_index (comp->ports, i);
GST_DEBUG_OBJECT (comp->parent, "Settings of port %u changed", index);
g_mutex_lock (port->port_lock);
port->settings_changed = TRUE;
g_cond_broadcast (port->port_cond);
g_mutex_unlock (port->port_lock);
/* FIXME XXX: Bellagio only sends the event for the
* input port even if the output port settings change
* too...
*/
{
gint i, n;
n = comp->ports->len;
for (i = 0; i < n; i++) {
port = g_ptr_array_index (comp->ports, i);
/* Don't notify the same port twice */
if (port->index == index)
continue;
g_mutex_lock (port->port_lock);
port->settings_changed = TRUE;
g_cond_broadcast (port->port_cond);
g_mutex_unlock (port->port_lock);
}
g_mutex_lock (port->port_lock);
g_cond_broadcast (port->port_cond);
g_mutex_unlock (port->port_lock);
}
break;
}
case OMX_EventPortFormatDetected:
@ -366,11 +342,15 @@ gst_omx_component_new (GstObject * parent, const gchar * core_name,
comp->parent = gst_object_ref (parent);
comp->ports = g_ptr_array_new ();
comp->n_in_ports = 0;
comp->n_out_ports = 0;
comp->state_lock = g_mutex_new ();
comp->state_cond = g_cond_new ();
comp->pending_state = OMX_StateInvalid;
comp->last_error = OMX_ErrorNone;
comp->settings_cookie = 0;
comp->reconfigure_out_pending = 0;
OMX_GetState (comp->handle, &comp->state);
@ -435,6 +415,10 @@ gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state)
comp->pending_state = state;
err = OMX_SendCommand (comp->handle, OMX_CommandStateSet, state, NULL);
/* Reset some things */
if (old_state == OMX_StateExecuting && state == OMX_StateIdle)
comp->reconfigure_out_pending = 0;
done:
g_mutex_unlock (comp->state_lock);
@ -560,6 +544,13 @@ gst_omx_component_add_port (GstOMXComponent * comp, guint32 index)
port->flushing = TRUE;
port->flushed = FALSE;
port->settings_changed = FALSE;
port->enabled_changed = FALSE;
port->settings_cookie = comp->settings_cookie;
if (port->port_def.eDir == OMX_DirInput)
comp->n_in_ports++;
else
comp->n_out_ports++;
g_ptr_array_add (comp->ports, port);
@ -587,6 +578,29 @@ gst_omx_component_get_port (GstOMXComponent * comp, guint32 index)
return NULL;
}
gint
gst_omx_component_get_settings_cookie (GstOMXComponent * comp)
{
gint cookie;
g_return_val_if_fail (comp != NULL, -1);
g_mutex_lock (comp->state_lock);
cookie = comp->settings_cookie;
g_mutex_unlock (comp->state_lock);
return cookie;
}
void
gst_omx_component_trigger_settings_changed (GstOMXComponent * comp)
{
g_return_if_fail (comp != NULL);
EventHandler (comp->handle, comp, OMX_EventPortSettingsChanged, OMX_ALL, 0,
NULL);
}
/* NOTE: This takes comp->state_lock *and* *all* port->port_lock! */
void
gst_omx_component_set_last_error (GstOMXComponent * comp, OMX_ERRORTYPE err)
@ -703,6 +717,8 @@ gst_omx_port_acquire_buffer (GstOMXPort * port, GstOMXBuffer ** buf)
g_mutex_lock (port->port_lock);
retry:
/* Check if the component is in an error state */
if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component is in error state: %d", err);
@ -716,58 +732,98 @@ gst_omx_port_acquire_buffer (GstOMXPort * port, GstOMXBuffer ** buf)
goto done;
}
/* Check if the port needs to be reconfigured */
if (port->settings_changed) {
GST_DEBUG_OBJECT (comp->parent, "Settings changed for port %u",
port->index);
ret = GST_OMX_ACQUIRE_BUFFER_RECONFIGURE;
goto done;
}
/* Wait until there's something in the queue
* or something else happened that requires
* to return a NULL buffer, e.g. an error
/* If this is an input port and it needs to be reconfigured we
* first wait here until all output ports are reconfigured and
* then return
*/
if (g_queue_is_empty (port->pending_buffers))
g_cond_wait (port->port_cond, port->port_lock);
if (port->port_def.eDir == OMX_DirInput &&
port->settings_cookie != gst_omx_component_get_settings_cookie (comp)) {
g_mutex_unlock (port->port_lock);
g_mutex_lock (comp->state_lock);
while (comp->reconfigure_out_pending > 0 &&
(err = comp->last_error) == OMX_ErrorNone && !port->flushing) {
GST_DEBUG_OBJECT (comp->parent,
"Waiting for %d output ports to reconfigure",
comp->reconfigure_out_pending);
g_cond_wait (comp->state_cond, comp->state_lock);
}
g_mutex_unlock (comp->state_lock);
g_mutex_lock (port->port_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Got error while waiting: %d", err);
ret = GST_OMX_ACQUIRE_BUFFER_ERROR;
goto done;
} else if (port->flushing) {
GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing now", port->index);
ret = GST_OMX_ACQUIRE_BUFFER_FLUSHING;
goto done;
}
/* Check if the component is in an error state */
if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component is in error state: %d", err);
ret = GST_OMX_ACQUIRE_BUFFER_ERROR;
goto done;
}
/* Check if the port is flushing */
if (port->flushing) {
ret = GST_OMX_ACQUIRE_BUFFER_FLUSHING;
goto done;
}
/* Check if the port needs to be reconfigured */
/* FIXME: There might still be pending buffers for the
* previous configuration */
if (port->settings_changed) {
GST_DEBUG_OBJECT (comp->parent, "Settings changed for port %u",
port->index);
ret = GST_OMX_ACQUIRE_BUFFER_RECONFIGURE;
port->settings_changed = TRUE;
goto done;
}
_buf = g_queue_pop_head (port->pending_buffers);
/* If we have an output port that needs to be reconfigured
* and it still has buffers pending for the old configuration
* we first return them.
* NOTE: If buffers for this configuration arrive later
* we have to drop them... */
if (port->port_def.eDir == OMX_DirOutput &&
port->settings_cookie != gst_omx_component_get_settings_cookie (comp)) {
if (!g_queue_is_empty (port->pending_buffers)) {
GST_DEBUG_OBJECT (comp->parent,
"Output port %u needs reconfiguration but has buffers pending",
port->index);
_buf = g_queue_pop_head (port->pending_buffers);
g_assert (_buf != NULL);
ret = GST_OMX_ACQUIRE_BUFFER_OK;
goto done;
}
if (_buf) {
ret = GST_OMX_ACQUIRE_BUFFER_OK;
*buf = _buf;
} else {
GST_ERROR_OBJECT (comp->parent, "Unexpectactly got no buffer");
ret = GST_OMX_ACQUIRE_BUFFER_ERROR;
ret = GST_OMX_ACQUIRE_BUFFER_RECONFIGURE;
port->settings_changed = TRUE;
goto done;
}
if (port->settings_changed) {
GST_DEBUG_OBJECT (comp->parent,
"Port %u has settings changed, need new caps", port->index);
ret = GST_OMX_ACQUIRE_BUFFER_RECONFIGURED;
port->settings_changed = FALSE;
goto done;
}
/*
* At this point we have no error or flushing port
* and a properly configured port.
*
*/
/* If the queue is empty we wait until a buffer
* arrives, an error happens, the port is flushing
* or the port needs to be reconfigured.
*/
if (g_queue_is_empty (port->pending_buffers)) {
GST_DEBUG_OBJECT (comp->parent, "Queue of port %u is empty", port->index);
g_cond_wait (port->port_cond, port->port_lock);
} else {
GST_DEBUG_OBJECT (comp->parent, "Port %u has pending buffers");
_buf = g_queue_pop_head (port->pending_buffers);
ret = GST_OMX_ACQUIRE_BUFFER_OK;
goto done;
}
/* And now check everything again and maybe get a buffer */
goto retry;
done:
g_mutex_unlock (port->port_lock);
GST_DEBUG_OBJECT (comp->parent, "Acquired buffer %p from port %u: %d", buf,
if (_buf)
*buf = _buf;
GST_DEBUG_OBJECT (comp->parent, "Acquired buffer %p from port %u: %d", _buf,
port->index, ret);
return ret;
@ -790,16 +846,18 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf)
g_mutex_lock (port->port_lock);
if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component is in error state: %d", err);
goto done;
}
if (port->flushing) {
GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing, not releasing buffer",
port->index);
goto done;
}
if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component is in error state: %d", err);
goto done;
}
/* FIXME: What if the settings cookies don't match? */
buf->used = TRUE;
if (port->port_def.eDir == OMX_DirInput) {
@ -853,9 +911,21 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
g_mutex_unlock (comp->state_lock);
port->flushing = flush;
if (flush)
if (flush) {
g_cond_broadcast (port->port_cond);
/* We also need to signal the state cond because
* an input port might wait on this for the output
* ports to reconfigure. This will not confuse
* other waiters on the state cond because they will
* additionally check if the condition they're waiting
* for is true after waking up.
*/
g_mutex_lock (comp->state_lock);
g_cond_broadcast (comp->state_cond);
g_mutex_unlock (comp->state_lock);
}
if (flush) {
GTimeVal abstimeout, *timeval;
gboolean signalled;
@ -1012,6 +1082,7 @@ gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port)
buf = g_slice_new0 (GstOMXBuffer);
buf->port = port;
buf->used = FALSE;
buf->settings_cookie = port->settings_cookie;
g_ptr_array_add (port->buffers, buf);
err =
@ -1312,26 +1383,6 @@ gst_omx_port_is_enabled (GstOMXPort * port)
return enabled;
}
gboolean
gst_omx_port_is_settings_changed (GstOMXPort * port)
{
GstOMXComponent *comp;
gboolean settings_changed;
g_return_val_if_fail (port != NULL, FALSE);
comp = port->comp;
g_mutex_lock (port->port_lock);
settings_changed = port->settings_changed;
g_mutex_unlock (port->port_lock);
GST_DEBUG_OBJECT (comp->parent, "Port %u has settings-changed: %d",
port->index, settings_changed);
return settings_changed;
}
OMX_ERRORTYPE
gst_omx_port_reconfigure (GstOMXPort * port)
{
@ -1360,7 +1411,18 @@ gst_omx_port_reconfigure (GstOMXPort * port)
if (err != OMX_ErrorNone)
goto done;
port->settings_changed = FALSE;
port->settings_cookie = comp->settings_cookie;
/* If this is an output port, notify all input ports
* that might wait for us to reconfigure in
* acquire_buffer()
*/
if (port->port_def.eDir == OMX_DirOutput) {
g_mutex_lock (comp->state_lock);
comp->reconfigure_out_pending--;
g_cond_broadcast (comp->state_cond);
g_mutex_unlock (comp->state_lock);
}
done:
GST_DEBUG_OBJECT (comp->parent, "Reconfigured port %u: %d", port->index, err);

View file

@ -34,9 +34,16 @@ typedef struct _GstOMXComponent GstOMXComponent;
typedef struct _GstOMXBuffer GstOMXBuffer;
typedef enum {
/* Everything good and the buffer is valid */
GST_OMX_ACQUIRE_BUFFER_OK = 0,
/* The port is flushing, exit ASAP */
GST_OMX_ACQUIRE_BUFFER_FLUSHING,
/* The port must be reconfigured */
GST_OMX_ACQUIRE_BUFFER_RECONFIGURE,
/* The port was reconfigured and the caps might have changed
* NOTE: This is only returned a single time! */
GST_OMX_ACQUIRE_BUFFER_RECONFIGURED,
/* A fatal error happened */
GST_OMX_ACQUIRE_BUFFER_ERROR
} GstOMXAcquireBufferReturn;
@ -64,13 +71,16 @@ struct _GstOMXPort {
guint32 index;
/* Protects port_def, buffers, pending_buffers,
* settings_changed, flushing, flushed, enabled_changed.
* settings_changed, flushing, flushed, enabled_changed
* and settings_cookie.
*
* Signalled if pending_buffers gets a
* new buffer or flushing/flushed is set
* to TRUE or an error happens. Always
* check comp->last_error after being
* signalled!
* to TRUE or the port is enabled/disabled
* or the settings change or an error happens.
*
* Note: Always check comp->last_error before
* waiting and after being signalled!
*
* Note: flushed==TRUE implies flushing==TRUE!
*
@ -82,12 +92,15 @@ struct _GstOMXPort {
OMX_PARAM_PORTDEFINITIONTYPE port_def;
GPtrArray *buffers; /* Contains GstOMXBuffer* */
GQueue *pending_buffers; /* Contains GstOMXBuffer* */
/* If TRUE we need to get the new caps
* of this port */
/* If TRUE we need to get the new caps of this port */
gboolean settings_changed;
gboolean flushing;
gboolean flushed; /* TRUE after OMX_CommandFlush was done */
gboolean enabled_changed; /* TRUE after OMX_Command{En,Dis}able was done */
/* If not equal to comp->settings_cookie we need
* to reconfigure this port */
gint settings_cookie;
};
struct _GstOMXComponent {
@ -96,8 +109,10 @@ struct _GstOMXComponent {
GstOMXCore *core;
GPtrArray *ports; /* Contains GstOMXPort* */
gint n_in_ports, n_out_ports;
/* Protecting state, pending_state and last_error
/* Protecting state, pending_state, last_error
* and settings_cookie.
* Signalled if one of them changes
*/
GMutex *state_lock;
@ -107,6 +122,14 @@ struct _GstOMXComponent {
OMX_STATETYPE pending_state;
/* OMX_ErrorNone usually, if different nothing will work */
OMX_ERRORTYPE last_error;
/* Updated whenever settings of any port are changing.
* We always reconfigure all ports */
gint settings_cookie;
/* Number of output ports that must still be reconfigured.
* If any are pending no input port will be reconfigured
* or will accept any data and wait.
*/
gint reconfigure_out_pending;
};
struct _GstOMXBuffer {
@ -117,6 +140,9 @@ struct _GstOMXBuffer {
* between {Empty,Fill}ThisBuffer and the callback
*/
gboolean used;
/* Cookie of the settings when this buffer was allocated */
gint settings_cookie;
};
GstOMXCore * gst_omx_core_acquire (const gchar * filename);
@ -135,6 +161,9 @@ OMX_ERRORTYPE gst_omx_component_get_last_error (GstOMXComponent * comp);
GstOMXPort * gst_omx_component_add_port (GstOMXComponent * comp, guint32 index);
GstOMXPort * gst_omx_component_get_port (GstOMXComponent * comp, guint32 index);
gint gst_omx_component_get_settings_cookie (GstOMXComponent * comp);
void gst_omx_component_trigger_settings_changed (GstOMXComponent * comp);
void gst_omx_port_get_port_definition (GstOMXPort * port, OMX_PARAM_PORTDEFINITIONTYPE * port_def);
gboolean gst_omx_port_update_port_definition (GstOMXPort *port, OMX_PARAM_PORTDEFINITIONTYPE *port_definition);
@ -153,8 +182,6 @@ OMX_ERRORTYPE gst_omx_port_reconfigure (GstOMXPort * port);
OMX_ERRORTYPE gst_omx_port_set_enabled (GstOMXPort * port, gboolean enabled);
gboolean gst_omx_port_is_enabled (GstOMXPort * port);
gboolean gst_omx_port_is_settings_changed (GstOMXPort * port);
G_END_DECLS
#endif /* __GST_OMX_H__ */

View file

@ -307,13 +307,20 @@ gst_omx_video_dec_loop (GstOMXVideoDec * self)
goto component_error;
} else if (acq_return == GST_OMX_ACQUIRE_BUFFER_FLUSHING) {
goto flushing;
} else if (acq_return == GST_OMX_ACQUIRE_BUFFER_RECONFIGURE) {
if (gst_omx_port_reconfigure (self->out_port) != OMX_ErrorNone)
goto reconfigure_error;
/* And restart the loop */
return;
}
if (!GST_PAD_CAPS (GST_BASE_VIDEO_CODEC_SRC_PAD (self))
|| acq_return == GST_OMX_ACQUIRE_BUFFER_RECONFIGURE) {
|| acq_return == GST_OMX_ACQUIRE_BUFFER_RECONFIGURED) {
GstVideoState *state = &GST_BASE_VIDEO_CODEC (self)->state;
OMX_PARAM_PORTDEFINITIONTYPE port_def;
GST_DEBUG_OBJECT (self, "Port settings have changed, updating caps");
gst_omx_port_get_port_definition (port, &port_def);
g_assert (port_def.format.video.eCompressionFormat ==
OMX_VIDEO_CodingUnused);
@ -331,13 +338,15 @@ gst_omx_video_dec_loop (GstOMXVideoDec * self)
state->height = port_def.format.video.nFrameHeight;
/* FIXME XXX: Bellagio does not set this to something useful... */
/* gst_util_double_to_fraction (port_def.format.video.xFramerate / ((gdouble) 0xffff), &state->fps_n, &state->fps_d); */
gst_base_video_decoder_set_src_caps (GST_BASE_VIDEO_DECODER (self));
if (acq_return == GST_OMX_ACQUIRE_BUFFER_RECONFIGURE) {
if (gst_omx_port_reconfigure (self->out_port) != OMX_ErrorNone)
goto reconfigure_error;
return;
if (!gst_base_video_decoder_set_src_caps (GST_BASE_VIDEO_DECODER (self))) {
if (buf)
gst_omx_port_release_buffer (self->out_port, buf);
goto caps_failed;
}
/* Now get a buffer */
if (acq_return != GST_OMX_ACQUIRE_BUFFER_OK)
return;
}
g_assert (acq_return == GST_OMX_ACQUIRE_BUFFER_OK && buf != NULL);
@ -349,8 +358,16 @@ gst_omx_video_dec_loop (GstOMXVideoDec * self)
if (!frame) {
GST_ERROR_OBJECT (self, "No corresponding frame found");
} else if (buf->omx_buf->nFilledLen > 0) {
if (gst_base_video_decoder_alloc_src_frame (GST_BASE_VIDEO_DECODER (self),
frame) == GST_FLOW_OK) {
if (GST_BASE_VIDEO_CODEC (self)->state.bytes_per_picture == 0) {
/* FIXME: If the sinkpad caps change we have currently no way
* to allocate new src buffers because basevideodecoder assumes
* that the caps on both pads are equivalent all the time
*/
GST_WARNING_OBJECT (self,
"Caps change pending and still have buffers for old caps -- dropping");
} else
if (gst_base_video_decoder_alloc_src_frame (GST_BASE_VIDEO_DECODER
(self), frame) == GST_FLOW_OK) {
/* FIXME: This currently happens because of a race condition too.
* We first need to reconfigure the output port and then the input
* port if both need reconfiguration.
@ -442,6 +459,14 @@ invalid_frame_size:
gst_pad_pause_task (GST_BASE_VIDEO_CODEC_SRC_PAD (self));
return;
}
caps_failed:
{
GST_ELEMENT_ERROR (self, LIBRARY, SETTINGS, (NULL), ("Failed to set caps"));
gst_pad_push_event (GST_BASE_VIDEO_CODEC_SRC_PAD (self),
gst_event_new_eos ());
gst_pad_pause_task (GST_BASE_VIDEO_CODEC_SRC_PAD (self));
return;
}
}
static gboolean
@ -550,6 +575,7 @@ gst_omx_video_dec_set_format (GstBaseVideoDecoder * decoder,
if (needs_disable) {
if (gst_omx_port_set_enabled (self->in_port, TRUE) != OMX_ErrorNone)
return FALSE;
gst_omx_component_trigger_settings_changed (self->component);
} else {
if (gst_omx_component_set_state (self->component,
OMX_StateIdle) != OMX_ErrorNone)
@ -650,6 +676,9 @@ gst_omx_video_dec_handle_frame (GstBaseVideoDecoder * decoder,
goto reconfigure_error;
/* Now get a new buffer and fill it */
continue;
} else if (acq_ret == GST_OMX_ACQUIRE_BUFFER_RECONFIGURED) {
/* TODO: Anything to do here? Don't think so */
continue;
}
g_assert (acq_ret == GST_OMX_ACQUIRE_BUFFER_OK && buf != NULL);