Implement a new custom recursive mutex type and fix locking in callbacks so that in-context calls are allowed.

According to the OMX specification, implementations are allowed to call
callbacks in the context of their function calls. However, our callbacks
take locks and this causes deadlocks if the unerlying OMX implementation
uses this kind of in-context calls.

A solution to the problem would be a recursive mutex. However, a normal
recursive mutex does not fix the problem because it is not guaranteed
that the callbacks are called from the same thread. What we see in Broadcom's
implementation for example is:

- OMX_Foo is called
- OMX_Foo waits on a condition
- A callback is executed in a different thread
- When the callback returns, its calling function
  signals the condition that OMX_Foo waits on
- OMX_Foo wakes up and returns

The solution I came up with here is to take a second lock inside the callback,
but only if recursion is expected to happen. Therefore, all calls to OMX
functions are guarded by calls to gst_omx_rec_mutex_begin_recursion() / _end_recursion(),
which effectively tells the mutex that at this point we want to allow calls
to _recursive_lock() to succeed, although we are still holding the master lock.
This commit is contained in:
George Kiagiadakis 2012-04-30 23:58:43 +03:00
parent 5c15caef8e
commit 158775f497
8 changed files with 328 additions and 99 deletions

View file

@ -16,7 +16,8 @@ libgstopenmax_la_SOURCES = \
gstbasevideocodec.c \
gstbasevideodecoder.c \
gstbasevideoencoder.c \
gstbasevideoutils.c
gstbasevideoutils.c \
gstomxrecmutex.c
noinst_HEADERS = \
gstomx.h \
@ -34,7 +35,8 @@ noinst_HEADERS = \
gstbasevideocodec.h \
gstbasevideodecoder.h \
gstbasevideoencoder.h \
gstbasevideoutils.h
gstbasevideoutils.h \
gstomxrecmutex.h
fixbaseclasses = \
-DGstBaseVideoCodec=OMXBaseVideoCodec \

View file

@ -163,12 +163,12 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent,
* a state change to be finished */
GST_DEBUG_OBJECT (comp->parent, "State change to %d finished",
nData2);
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_recursive_lock (&comp->state_lock);
comp->state = (OMX_STATETYPE) nData2;
if (comp->state == comp->pending_state)
comp->pending_state = OMX_StateInvalid;
g_cond_broadcast (comp->state_cond);
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_recursive_unlock (&comp->state_lock);
break;
}
case OMX_CommandFlush:{
@ -185,7 +185,7 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent,
* that the port is really flushed now and
* we can continue
*/
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_recursive_lock (&port->port_lock);
/* FIXME: If this is ever called when the port
* was not set to flushing something went
* wrong but it happens for some reason.
@ -197,7 +197,7 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent,
GST_ERROR_OBJECT (comp->parent, "Port %u was not flushing",
port->index);
}
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
break;
}
case OMX_CommandPortEnable:
@ -212,10 +212,10 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent,
GST_DEBUG_OBJECT (comp->parent, "Port %u %s", port->index,
(cmd == OMX_CommandPortEnable ? "enabled" : "disabled"));
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_recursive_lock (&port->port_lock);
port->enabled_changed = TRUE;
g_cond_broadcast (port->port_cond);
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
break;
}
@ -265,17 +265,17 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent,
for (i = 0; i < n; i++) {
GstOMXPort *port = g_ptr_array_index (comp->ports, i);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_recursive_lock (&port->port_lock);
if (port_index == OMX_ALL || port_index == port->index) {
port->settings_cookie++;
if (port->port_def.eDir == OMX_DirOutput)
outports = g_list_prepend (outports, port);
g_cond_broadcast (port->port_cond);
}
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
}
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_recursive_lock (&comp->state_lock);
for (k = outports; k; k = k->next) {
gboolean found = FALSE;
@ -293,7 +293,7 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent,
if (comp->pending_reconfigure_outports)
g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1);
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_recursive_unlock (&comp->state_lock);
g_list_free (outports);
@ -328,7 +328,7 @@ EmptyBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData,
/* Input buffer is empty again and can
* be used to contain new input */
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_recursive_lock (&port->port_lock);
GST_DEBUG_OBJECT (comp->parent, "Port %u emptied buffer %p (%p)",
port->index, buf, buf->omx_buf->pBuffer);
buf->used = FALSE;
@ -353,7 +353,7 @@ EmptyBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData,
g_queue_push_tail (port->pending_buffers, buf);
g_cond_broadcast (port->port_cond);
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
return OMX_ErrorNone;
}
@ -378,13 +378,13 @@ FillBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData,
/* Output buffer contains output now or
* the port was flushed */
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_recursive_lock (&port->port_lock);
GST_DEBUG_OBJECT (comp->parent, "Port %u filled buffer %p (%p)", port->index,
buf, buf->omx_buf->pBuffer);
buf->used = FALSE;
g_queue_push_tail (port->pending_buffers, buf);
g_cond_broadcast (port->port_cond);
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
return OMX_ErrorNone;
}
@ -427,7 +427,7 @@ gst_omx_component_new (GstObject * parent, const GstOMXClassData * cdata)
comp->n_in_ports = 0;
comp->n_out_ports = 0;
comp->state_lock = g_mutex_new ();
gst_omx_rec_mutex_init (&comp->state_lock);
comp->state_cond = g_cond_new ();
comp->pending_state = OMX_StateInvalid;
comp->last_error = OMX_ErrorNone;
@ -475,7 +475,7 @@ gst_omx_component_free (GstOMXComponent * comp)
gst_omx_port_deallocate_buffers (port);
g_mutex_free (port->port_lock);
gst_omx_rec_mutex_clear (&port->port_lock);
g_cond_free (port->port_cond);
g_queue_free (port->pending_buffers);
@ -493,7 +493,7 @@ gst_omx_component_free (GstOMXComponent * comp)
gst_omx_core_release (comp->core);
g_cond_free (comp->state_cond);
g_mutex_free (comp->state_lock);
gst_omx_rec_mutex_clear (&comp->state_lock);
gst_object_unref (comp->parent);
@ -508,7 +508,7 @@ gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state)
g_return_val_if_fail (comp != NULL, OMX_ErrorUndefined);
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_lock (&comp->state_lock);
old_state = comp->state;
GST_DEBUG_OBJECT (comp->parent, "Setting state from %d to %d", old_state,
state);
@ -534,15 +534,13 @@ gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state)
g_cond_broadcast (comp->state_cond);
}
/* Release lock because the command could call into the callbacks,
* which take the lock again */
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_begin_recursion (&comp->state_lock);
err = OMX_SendCommand (comp->handle, OMX_CommandStateSet, state, NULL);
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_end_recursion (&comp->state_lock);
/* No need to check if anything has changed here */
done:
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_unlock (&comp->state_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
@ -564,7 +562,7 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout)
GST_DEBUG_OBJECT (comp->parent, "Getting state");
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_lock (&comp->state_lock);
ret = comp->state;
if (comp->pending_state == OMX_StateInvalid)
goto done;
@ -592,7 +590,8 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout)
}
do {
signalled = g_cond_timed_wait (comp->state_cond, comp->state_lock, timeval);
signalled =
g_cond_timed_wait (comp->state_cond, comp->state_lock.lock, timeval);
} while (signalled && comp->last_error == OMX_ErrorNone
&& comp->pending_state != OMX_StateInvalid);
@ -615,7 +614,7 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout)
}
done:
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_unlock (&comp->state_lock);
/* If we waited and timed out this component is unusable now */
if (!signalled)
@ -662,7 +661,7 @@ gst_omx_component_add_port (GstOMXComponent * comp, guint32 index)
port->port_def = port_def;
port->port_lock = g_mutex_new ();
gst_omx_rec_mutex_init (&port->port_lock);
port->port_cond = g_cond_new ();
port->pending_buffers = g_queue_new ();
port->flushing = TRUE;
@ -735,14 +734,14 @@ gst_omx_component_set_last_error (GstOMXComponent * comp, OMX_ERRORTYPE err)
GST_ERROR_OBJECT (comp->parent, "Setting last error: %s (0x%08x)",
gst_omx_error_to_string (err), err);
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_lock (&comp->state_lock);
/* We only set the first error ever from which
* we can't recover anymore.
*/
if (comp->last_error == OMX_ErrorNone)
comp->last_error = err;
g_cond_broadcast (comp->state_cond);
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_unlock (&comp->state_lock);
/* Now notify all ports, no locking needed
* here because the ports are allocated in the
@ -753,9 +752,9 @@ gst_omx_component_set_last_error (GstOMXComponent * comp, OMX_ERRORTYPE err)
for (i = 0; i < n; i++) {
GstOMXPort *tmp = g_ptr_array_index (comp->ports, i);
g_mutex_lock (tmp->port_lock);
gst_omx_rec_mutex_lock (&tmp->port_lock);
g_cond_broadcast (tmp->port_cond);
g_mutex_unlock (tmp->port_lock);
gst_omx_rec_mutex_unlock (&tmp->port_lock);
}
}
@ -766,9 +765,9 @@ gst_omx_component_get_last_error (GstOMXComponent * comp)
g_return_val_if_fail (comp != NULL, OMX_ErrorUndefined);
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_lock (&comp->state_lock);
err = comp->last_error;
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_unlock (&comp->state_lock);
GST_DEBUG_OBJECT (comp->parent, "Returning last error: %s (0x%08x)",
gst_omx_error_to_string (err), err);
@ -882,7 +881,7 @@ gst_omx_port_update_port_definition (GstOMXPort * port,
comp = port->comp;
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
if (port_def)
err =
gst_omx_component_set_parameter (comp, OMX_IndexParamPortDefinition,
@ -893,7 +892,7 @@ gst_omx_port_update_port_definition (GstOMXPort * port,
GST_DEBUG_OBJECT (comp->parent, "Updated port %u definition: %s (0x%08x)",
port->index, gst_omx_error_to_string (err), err);
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
return (err == OMX_ErrorNone);
}
@ -915,7 +914,7 @@ gst_omx_port_acquire_buffer (GstOMXPort * port, GstOMXBuffer ** buf)
GST_DEBUG_OBJECT (comp->parent, "Acquiring buffer from port %u", port->index);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
retry:
@ -940,16 +939,16 @@ retry:
*/
if (port->port_def.eDir == OMX_DirInput) {
if (g_atomic_int_get (&comp->have_pending_reconfigure_outports)) {
g_mutex_unlock (port->port_lock);
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
gst_omx_rec_mutex_lock (&comp->state_lock);
while (g_atomic_int_get (&comp->have_pending_reconfigure_outports) &&
(err = comp->last_error) == OMX_ErrorNone && !port->flushing) {
GST_DEBUG_OBJECT (comp->parent,
"Waiting for output ports to reconfigure");
g_cond_wait (comp->state_cond, comp->state_lock);
g_cond_wait (comp->state_cond, comp->state_lock.lock);
}
g_mutex_unlock (comp->state_lock);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_unlock (&comp->state_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
goto retry;
}
@ -1004,7 +1003,7 @@ retry:
*/
if (g_queue_is_empty (port->pending_buffers)) {
GST_DEBUG_OBJECT (comp->parent, "Queue of port %u is empty", port->index);
g_cond_wait (port->port_cond, port->port_lock);
g_cond_wait (port->port_cond, port->port_lock.lock);
} else {
GST_DEBUG_OBJECT (comp->parent, "Port %u has pending buffers",
port->pending_buffers);
@ -1017,7 +1016,7 @@ retry:
goto retry;
done:
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
if (_buf) {
g_assert (_buf == _buf->omx_buf->pAppPrivate);
@ -1045,7 +1044,7 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf)
GST_DEBUG_OBJECT (comp->parent, "Releasing buffer %p (%p) to port %u",
buf, buf->omx_buf->pBuffer, port->index);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
if (port->port_def.eDir == OMX_DirInput) {
/* Reset all flags, some implementations don't
@ -1076,17 +1075,20 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf)
/* FIXME: What if the settings cookies don't match? */
buf->used = TRUE;
gst_omx_rec_mutex_begin_recursion (&port->port_lock);
if (port->port_def.eDir == OMX_DirInput) {
err = OMX_EmptyThisBuffer (comp->handle, buf->omx_buf);
} else {
err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
}
gst_omx_rec_mutex_end_recursion (&port->port_lock);
GST_DEBUG_OBJECT (comp->parent, "Released buffer %p to port %u: %s (0x%08x)",
buf, port->index, gst_omx_error_to_string (err), err);
done:
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
return err;
}
@ -1103,7 +1105,7 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
GST_DEBUG_OBJECT (comp->parent, "Setting port %d to %sflushing",
port->index, (flush ? "" : "not "));
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
if (! !flush == ! !port->flushing) {
GST_DEBUG_OBJECT (comp->parent, "Port %u was %sflushing already",
port->index, (flush ? "" : "not "));
@ -1116,17 +1118,17 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
goto done;
}
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_lock (&comp->state_lock);
if (comp->state != OMX_StateIdle && comp->state != OMX_StateExecuting) {
GST_DEBUG_OBJECT (comp->parent, "Component is in wrong state: %d",
comp->state);
err = OMX_ErrorUndefined;
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_unlock (&comp->state_lock);
goto done;
}
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_unlock (&comp->state_lock);
port->flushing = flush;
if (flush) {
@ -1143,17 +1145,16 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
* additionally check if the condition they're waiting
* for is true after waking up.
*/
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_lock (&comp->state_lock);
g_cond_broadcast (comp->state_cond);
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_unlock (&comp->state_lock);
/* Now flush the port */
port->flushed = FALSE;
/* Unlock because this could call one of the callbacks which
* also take the lock */
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_begin_recursion (&port->port_lock);
err = OMX_SendCommand (comp->handle, OMX_CommandFlush, port->index, NULL);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
@ -1186,7 +1187,8 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
last_error = OMX_ErrorNone;
while (signalled && last_error == OMX_ErrorNone && !port->flushed
&& port->buffers->len > g_queue_get_length (port->pending_buffers)) {
signalled = g_cond_timed_wait (port->port_cond, port->port_lock, timeval);
signalled =
g_cond_timed_wait (port->port_cond, port->port_lock.lock, timeval);
last_error = gst_omx_component_get_last_error (comp);
}
@ -1221,9 +1223,11 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
* valid anymore after the buffer was consumed
*/
buf->omx_buf->nFlags = 0;
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_begin_recursion (&port->port_lock);
err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
"Failed to pass buffer %p (%p) to port %u: %s (0x%08x)", buf,
@ -1240,7 +1244,7 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
done:
GST_DEBUG_OBJECT (comp->parent, "Set port %u to %sflushing: %s (0x%08x)",
port->index, (flush ? "" : "not "), gst_omx_error_to_string (err), err);
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
return err;
@ -1250,9 +1254,9 @@ error:
* set_last_error() needs all port locks.
* This is safe here because we're just going
* to error out anyway */
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
gst_omx_component_set_last_error (comp, err);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
goto done;
}
}
@ -1267,9 +1271,9 @@ gst_omx_port_is_flushing (GstOMXPort * port)
comp = port->comp;
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
flushing = port->flushing;
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing: %d", port->index,
flushing);
@ -1338,9 +1342,12 @@ gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port)
buf->settings_cookie = port->settings_cookie;
g_ptr_array_add (port->buffers, buf);
gst_omx_rec_mutex_begin_recursion (&port->port_lock);
err =
OMX_AllocateBuffer (comp->handle, &buf->omx_buf, port->index, buf,
port->port_def.nBufferSize);
gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
"Failed to allocate buffer for port %u: %s (0x%08x)", port->index,
@ -1369,9 +1376,9 @@ error:
* set_last_error() needs all port locks.
* This is safe here because we're just going
* to error out anyway */
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
gst_omx_component_set_last_error (comp, err);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
goto done;
}
}
@ -1383,9 +1390,9 @@ gst_omx_port_allocate_buffers (GstOMXPort * port)
g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
err = gst_omx_port_allocate_buffers_unlocked (port);
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
return err;
}
@ -1439,7 +1446,11 @@ gst_omx_port_deallocate_buffers_unlocked (GstOMXPort * port)
buf->omx_buf->pAppPrivate = NULL;
GST_DEBUG_OBJECT (comp->parent, "Deallocating buffer %p (%p)", buf,
buf->omx_buf->pBuffer);
gst_omx_rec_mutex_begin_recursion (&port->port_lock);
tmp = OMX_FreeBuffer (comp->handle, port->index, buf->omx_buf);
gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (tmp != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
"Failed to deallocate buffer %d of port %u: %s (0x%08x)", i,
@ -1473,9 +1484,9 @@ gst_omx_port_deallocate_buffers (GstOMXPort * port)
g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
err = gst_omx_port_deallocate_buffers_unlocked (port);
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
return err;
}
@ -1516,7 +1527,7 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
port->flushing = TRUE;
}
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_begin_recursion (&port->port_lock);
if (enabled)
err =
OMX_SendCommand (comp->handle, OMX_CommandPortEnable, port->index,
@ -1525,7 +1536,7 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
err =
OMX_SendCommand (comp->handle, OMX_CommandPortDisable,
port->index, NULL);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
@ -1555,7 +1566,8 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
last_error = OMX_ErrorNone;
while (signalled && last_error == OMX_ErrorNone && (port->buffers
&& port->buffers->len > g_queue_get_length (port->pending_buffers))) {
signalled = g_cond_timed_wait (port->port_cond, port->port_lock, timeval);
signalled =
g_cond_timed_wait (port->port_cond, port->port_lock.lock, timeval);
last_error = gst_omx_component_get_last_error (comp);
}
@ -1594,7 +1606,8 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
&port->port_def);
while (signalled && last_error == OMX_ErrorNone
&& (! !port->port_def.bEnabled != ! !enabled || !port->enabled_changed)) {
signalled = g_cond_timed_wait (port->port_cond, port->port_lock, timeval);
signalled =
g_cond_timed_wait (port->port_cond, port->port_lock.lock, timeval);
last_error = gst_omx_component_get_last_error (comp);
gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition,
&port->port_def);
@ -1634,7 +1647,11 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
* valid anymore after the buffer was consumed
*/
buf->omx_buf->nFlags = 0;
gst_omx_rec_mutex_begin_recursion (&port->port_lock);
err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
"Failed to pass buffer %p (%p) to port %u: %s (0x%08x)", buf,
@ -1661,9 +1678,9 @@ error:
* set_last_error() needs all port locks.
* This is safe here because we're just going
* to error out anyway */
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
gst_omx_component_set_last_error (comp, err);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
goto done;
}
}
@ -1675,9 +1692,9 @@ gst_omx_port_set_enabled (GstOMXPort * port, gboolean enabled)
g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
err = gst_omx_port_set_enabled_unlocked (port, enabled);
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
return err;
}
@ -1692,11 +1709,11 @@ gst_omx_port_is_enabled (GstOMXPort * port)
comp = port->comp;
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition,
&port->port_def);
enabled = port->port_def.bEnabled;
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
GST_DEBUG_OBJECT (comp->parent, "Port %u is enabled: %d", port->index,
enabled);
@ -1716,7 +1733,7 @@ gst_omx_port_reconfigure (GstOMXPort * port)
GST_DEBUG_OBJECT (comp->parent, "Reconfiguring port %u", port->index);
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
if (!port->settings_changed)
goto done;
@ -1744,7 +1761,7 @@ gst_omx_port_reconfigure (GstOMXPort * port)
if (port->port_def.eDir == OMX_DirOutput) {
GList *l;
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_lock (&comp->state_lock);
for (l = comp->pending_reconfigure_outports; l; l = l->next) {
if (l->data == (gpointer) port) {
comp->pending_reconfigure_outports =
@ -1756,14 +1773,14 @@ gst_omx_port_reconfigure (GstOMXPort * port)
g_atomic_int_set (&comp->have_pending_reconfigure_outports, 0);
g_cond_broadcast (comp->state_cond);
}
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_unlock (&comp->state_lock);
}
done:
GST_DEBUG_OBJECT (comp->parent, "Reconfigured port %u: %s (0x%08x)",
port->index, gst_omx_error_to_string (err), err);
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
return err;
}
@ -1780,7 +1797,7 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start)
GST_DEBUG_OBJECT (comp->parent, "Manual reconfigure of port %u %s",
port->index, (start ? "start" : "stsop"));
g_mutex_lock (port->port_lock);
gst_omx_rec_mutex_lock (&port->port_lock);
if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone)
goto done;
@ -1794,7 +1811,7 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start)
GList *l;
if (start) {
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_lock (&comp->state_lock);
for (l = comp->pending_reconfigure_outports; l; l = l->next) {
if (l->data == (gpointer) port)
break;
@ -1805,9 +1822,9 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start)
g_list_prepend (comp->pending_reconfigure_outports, port);
g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1);
}
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_unlock (&comp->state_lock);
} else {
g_mutex_lock (comp->state_lock);
gst_omx_rec_mutex_lock (&comp->state_lock);
for (l = comp->pending_reconfigure_outports; l; l = l->next) {
if (l->data == (gpointer) port) {
comp->pending_reconfigure_outports =
@ -1819,13 +1836,13 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start)
g_atomic_int_set (&comp->have_pending_reconfigure_outports, 0);
g_cond_broadcast (comp->state_cond);
}
g_mutex_unlock (comp->state_lock);
gst_omx_rec_mutex_unlock (&comp->state_lock);
}
}
done:
g_mutex_unlock (port->port_lock);
gst_omx_rec_mutex_unlock (&port->port_lock);
GST_DEBUG_OBJECT (comp->parent, "Manual reconfigure of port %u: %s (0x%08x)",
port->index, gst_omx_error_to_string (err), err);

View file

@ -27,6 +27,8 @@
#include <OMX_Core.h>
#include <OMX_Component.h>
#include "gstomxrecmutex.h"
G_BEGIN_DECLS
#define GST_OMX_INIT_STRUCT(st) G_STMT_START { \
@ -138,7 +140,7 @@ struct _GstOMXPort {
* Note: This lock must always be taken before
* the component's state lock if both are needed!
*/
GMutex *port_lock;
GstOMXRecMutex port_lock;
GCond *port_cond;
OMX_PARAM_PORTDEFINITIONTYPE port_def;
GPtrArray *buffers; /* Contains GstOMXBuffer* */
@ -171,7 +173,7 @@ struct _GstOMXComponent {
* pending_reconfigure_outports.
* Signalled if one of them changes
*/
GMutex *state_lock;
GstOMXRecMutex state_lock;
GCond *state_cond;
OMX_STATETYPE state;
/* OMX_StateInvalid if no pending state */

View file

@ -885,10 +885,10 @@ gst_omx_audio_enc_sink_event (GstAudioEncoder * encoder, GstEvent * event)
GST_WARNING_OBJECT (self, "Component does not support empty EOS buffers");
/* Insert a NULL into the queue to signal EOS */
g_mutex_lock (self->out_port->port_lock);
gst_omx_rec_mutex_lock (&self->out_port->port_lock);
g_queue_push_tail (self->out_port->pending_buffers, NULL);
g_cond_broadcast (self->out_port->port_cond);
g_mutex_unlock (self->out_port->port_lock);
gst_omx_rec_mutex_unlock (&self->out_port->port_lock);
return TRUE;
}

97
omx/gstomxrecmutex.c Normal file
View file

@ -0,0 +1,97 @@
/*
* Copyright (C) 2012, Collabora Ltd.
* Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation
* version 2.1 of the License.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstomxrecmutex.h"
void
gst_omx_rec_mutex_init (GstOMXRecMutex * mutex)
{
mutex->lock = g_mutex_new ();
mutex->recursion_lock = g_mutex_new ();
mutex->recursion_allowed = FALSE;
}
void
gst_omx_rec_mutex_clear (GstOMXRecMutex * mutex)
{
g_mutex_free (mutex->lock);
g_mutex_free (mutex->recursion_lock);
}
void
gst_omx_rec_mutex_lock (GstOMXRecMutex * mutex)
{
g_mutex_lock (mutex->lock);
}
void
gst_omx_rec_mutex_unlock (GstOMXRecMutex * mutex)
{
g_mutex_unlock (mutex->lock);
}
/* must be called with mutex->lock taken */
void
gst_omx_rec_mutex_begin_recursion (GstOMXRecMutex * mutex)
{
g_mutex_lock (mutex->recursion_lock);
g_assert (mutex->recursion_allowed == FALSE);
mutex->recursion_allowed = TRUE;
g_mutex_unlock (mutex->recursion_lock);
}
/* must be called with mutex->lock taken */
void
gst_omx_rec_mutex_end_recursion (GstOMXRecMutex * mutex)
{
g_mutex_lock (mutex->recursion_lock);
g_assert (mutex->recursion_allowed == TRUE);
mutex->recursion_allowed = FALSE;
g_mutex_unlock (mutex->recursion_lock);
}
void
gst_omx_rec_mutex_recursive_lock (GstOMXRecMutex * mutex)
{
g_mutex_lock (mutex->recursion_lock);
if (!mutex->recursion_allowed) {
/* no recursion allowed, lock the proper mutex */
g_mutex_unlock (mutex->recursion_lock);
g_mutex_lock (mutex->lock);
}
}
void
gst_omx_rec_mutex_recursive_unlock (GstOMXRecMutex * mutex)
{
/* It is safe to check recursion_allowed here because
* we hold at least one of the two locks and
* either lock protects it from being changed.
*/
if (mutex->recursion_allowed) {
g_mutex_unlock (mutex->recursion_lock);
} else {
g_mutex_unlock (mutex->lock);
}
}

111
omx/gstomxrecmutex.h Normal file
View file

@ -0,0 +1,111 @@
/*
* Copyright (C) 2012, Collabora Ltd.
* Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation
* version 2.1 of the License.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
*/
#ifndef __GST_OMX_REC_MUTEX_H__
#define __GST_OMX_REC_MUTEX_H__
#include <glib.h>
G_BEGIN_DECLS
/*
* This is a recursive mutex implementation that serves a very specific
* purpose; it is used to allow OpenMAX callbacks to be run in the context
* of some OpenMAX function call while the calling function is holding the
* master lock.
*
* According to the OpenMAX specification, we have 2 possible ways that
* callbacks may be called. First, we have out-of-context calls, which means
* that callbacks are called from a different thread at any point in time.
* In this case, callbacks must take the appropriate lock to protect the data
* that they are changing. Second, we have in-context calls, which means
* that callbacks are called when we call some OpenMAX function, before this
* function returns. In this case, we need to allow the callback to run
* without taking any locks that the caller of the OpenMAX function is holding.
*
* This can be solved by a recusrive mutex. However, a normal GRecMutex is
* not enough because it allows being locked multiple times only from
* the same thread. Unfortunatly, what we see in Broadcom's implementation,
* for instance, OpenMAX callbacks may be in-context, but from a different
* thread. This is achieved like this:
*
* - OMX_Foo is called
* - OMX_Foo waits on a condition
* - The callback is executed in a different thread
* - When the callback returns, its calling function
* signals the condition that OMX_Foo waits on
* - OMX_Foo wakes up and returns
*
* This recursive mutex implementation attempts to fix this problem.
* This is achieved like this: All functions lock this mutex normally
* using gst_omx_rec_mutex_lock() / _unlock(). These functions
* effectively lock the master mutex and they are identical in behavior
* with g_mutex_lock() / _unlock(). When a function that has already
* locked this mutex is about to call some OpenMAX function, it must
* call gst_omx_rec_mutex_begin_recursion() to indicate that recursive
* locking is now allowed, and similarly, call gst_omx_rec_mutex_end_recursion()
* after the OpenMAX function has returned to indicate that no recursive
* locking is allowed from this point on. Callbacks should lock the
* mutex using gst_omx_rec_mutex_recursive_lock() / _recursive_unlock().
* These two functions, depending on whether recursion is allowed
* will take/release either the master lock or the recursion_lock.
* Effectively, this allows callbacks to run in the context any code between
* calls to gst_omx_rec_mutex_begin_recursion() / _end_recursion().
*
* Note that this doesn't prevent out-of-context callback executions
* to be run at that point, but due to the fact that _end_recursion()
* also locks the recursion_lock, it is at least guaranteed that they
* will have finished their execution before _end_recursion() returns.
*/
typedef struct _GstOMXRecMutex GstOMXRecMutex;
struct _GstOMXRecMutex {
/* The master lock */
GMutex *lock;
/* This lock is taken when recursing.
* The master lock must always be taken before this one,
* by the caller of _begin_recursion().
*/
GMutex *recursion_lock;
/* Indicates whether recursion is allowed.
* When it is allowed, _recursive_lock() takes
* the recursion_lock instead of the master lock.
* This variable is protected by both locks.
*/
volatile gboolean recursion_allowed;
};
void gst_omx_rec_mutex_init (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_clear (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_lock (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_unlock (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_begin_recursion (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_end_recursion (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_recursive_lock (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_recursive_unlock (GstOMXRecMutex * mutex);
G_END_DECLS
#endif /* __GST_OMX_REC_MUTEX_H__ */

View file

@ -1342,10 +1342,10 @@ gst_omx_video_dec_finish (GstBaseVideoDecoder * decoder)
GST_WARNING_OBJECT (self, "Component does not support empty EOS buffers");
/* Insert a NULL into the queue to signal EOS */
g_mutex_lock (self->out_port->port_lock);
gst_omx_rec_mutex_lock (&self->out_port->port_lock);
g_queue_push_tail (self->out_port->pending_buffers, NULL);
g_cond_broadcast (self->out_port->port_cond);
g_mutex_unlock (self->out_port->port_lock);
gst_omx_rec_mutex_unlock (&self->out_port->port_lock);
return GST_BASE_VIDEO_DECODER_FLOW_DROPPED;
}

View file

@ -1392,10 +1392,10 @@ gst_omx_video_enc_finish (GstBaseVideoEncoder * encoder)
GST_WARNING_OBJECT (self, "Component does not support empty EOS buffers");
/* Insert a NULL into the queue to signal EOS */
g_mutex_lock (self->out_port->port_lock);
gst_omx_rec_mutex_lock (&self->out_port->port_lock);
g_queue_push_tail (self->out_port->pending_buffers, NULL);
g_cond_broadcast (self->out_port->port_cond);
g_mutex_unlock (self->out_port->port_lock);
gst_omx_rec_mutex_unlock (&self->out_port->port_lock);
return GST_BASE_VIDEO_ENCODER_FLOW_DROPPED;
}