diff --git a/omx/Makefile.am b/omx/Makefile.am index 2407fd3fae..94fc485249 100644 --- a/omx/Makefile.am +++ b/omx/Makefile.am @@ -16,7 +16,8 @@ libgstopenmax_la_SOURCES = \ gstbasevideocodec.c \ gstbasevideodecoder.c \ gstbasevideoencoder.c \ - gstbasevideoutils.c + gstbasevideoutils.c \ + gstomxrecmutex.c noinst_HEADERS = \ gstomx.h \ @@ -34,7 +35,8 @@ noinst_HEADERS = \ gstbasevideocodec.h \ gstbasevideodecoder.h \ gstbasevideoencoder.h \ - gstbasevideoutils.h + gstbasevideoutils.h \ + gstomxrecmutex.h fixbaseclasses = \ -DGstBaseVideoCodec=OMXBaseVideoCodec \ diff --git a/omx/gstomx.c b/omx/gstomx.c index 8c33561582..afaab2045b 100644 --- a/omx/gstomx.c +++ b/omx/gstomx.c @@ -163,12 +163,12 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent, * a state change to be finished */ GST_DEBUG_OBJECT (comp->parent, "State change to %d finished", nData2); - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_recursive_lock (&comp->state_lock); comp->state = (OMX_STATETYPE) nData2; if (comp->state == comp->pending_state) comp->pending_state = OMX_StateInvalid; g_cond_broadcast (comp->state_cond); - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_recursive_unlock (&comp->state_lock); break; } case OMX_CommandFlush:{ @@ -185,7 +185,7 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent, * that the port is really flushed now and * we can continue */ - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_recursive_lock (&port->port_lock); /* FIXME: If this is ever called when the port * was not set to flushing something went * wrong but it happens for some reason. @@ -197,7 +197,7 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent, GST_ERROR_OBJECT (comp->parent, "Port %u was not flushing", port->index); } - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_recursive_unlock (&port->port_lock); break; } case OMX_CommandPortEnable: @@ -212,10 +212,10 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent, GST_DEBUG_OBJECT (comp->parent, "Port %u %s", port->index, (cmd == OMX_CommandPortEnable ? "enabled" : "disabled")); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_recursive_lock (&port->port_lock); port->enabled_changed = TRUE; g_cond_broadcast (port->port_cond); - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_recursive_unlock (&port->port_lock); break; } @@ -265,17 +265,17 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent, for (i = 0; i < n; i++) { GstOMXPort *port = g_ptr_array_index (comp->ports, i); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_recursive_lock (&port->port_lock); if (port_index == OMX_ALL || port_index == port->index) { port->settings_cookie++; if (port->port_def.eDir == OMX_DirOutput) outports = g_list_prepend (outports, port); g_cond_broadcast (port->port_cond); } - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_recursive_unlock (&port->port_lock); } - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_recursive_lock (&comp->state_lock); for (k = outports; k; k = k->next) { gboolean found = FALSE; @@ -293,7 +293,7 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent, if (comp->pending_reconfigure_outports) g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1); - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_recursive_unlock (&comp->state_lock); g_list_free (outports); @@ -328,7 +328,7 @@ EmptyBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, /* Input buffer is empty again and can * be used to contain new input */ - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_recursive_lock (&port->port_lock); GST_DEBUG_OBJECT (comp->parent, "Port %u emptied buffer %p (%p)", port->index, buf, buf->omx_buf->pBuffer); buf->used = FALSE; @@ -353,7 +353,7 @@ EmptyBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, g_queue_push_tail (port->pending_buffers, buf); g_cond_broadcast (port->port_cond); - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_recursive_unlock (&port->port_lock); return OMX_ErrorNone; } @@ -378,13 +378,13 @@ FillBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, /* Output buffer contains output now or * the port was flushed */ - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_recursive_lock (&port->port_lock); GST_DEBUG_OBJECT (comp->parent, "Port %u filled buffer %p (%p)", port->index, buf, buf->omx_buf->pBuffer); buf->used = FALSE; g_queue_push_tail (port->pending_buffers, buf); g_cond_broadcast (port->port_cond); - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_recursive_unlock (&port->port_lock); return OMX_ErrorNone; } @@ -427,7 +427,7 @@ gst_omx_component_new (GstObject * parent, const GstOMXClassData * cdata) comp->n_in_ports = 0; comp->n_out_ports = 0; - comp->state_lock = g_mutex_new (); + gst_omx_rec_mutex_init (&comp->state_lock); comp->state_cond = g_cond_new (); comp->pending_state = OMX_StateInvalid; comp->last_error = OMX_ErrorNone; @@ -475,7 +475,7 @@ gst_omx_component_free (GstOMXComponent * comp) gst_omx_port_deallocate_buffers (port); - g_mutex_free (port->port_lock); + gst_omx_rec_mutex_clear (&port->port_lock); g_cond_free (port->port_cond); g_queue_free (port->pending_buffers); @@ -493,7 +493,7 @@ gst_omx_component_free (GstOMXComponent * comp) gst_omx_core_release (comp->core); g_cond_free (comp->state_cond); - g_mutex_free (comp->state_lock); + gst_omx_rec_mutex_clear (&comp->state_lock); gst_object_unref (comp->parent); @@ -508,7 +508,7 @@ gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state) g_return_val_if_fail (comp != NULL, OMX_ErrorUndefined); - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_lock (&comp->state_lock); old_state = comp->state; GST_DEBUG_OBJECT (comp->parent, "Setting state from %d to %d", old_state, state); @@ -534,15 +534,13 @@ gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state) g_cond_broadcast (comp->state_cond); } - /* Release lock because the command could call into the callbacks, - * which take the lock again */ - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_begin_recursion (&comp->state_lock); err = OMX_SendCommand (comp->handle, OMX_CommandStateSet, state, NULL); - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_end_recursion (&comp->state_lock); /* No need to check if anything has changed here */ done: - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_unlock (&comp->state_lock); if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, @@ -564,7 +562,7 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout) GST_DEBUG_OBJECT (comp->parent, "Getting state"); - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_lock (&comp->state_lock); ret = comp->state; if (comp->pending_state == OMX_StateInvalid) goto done; @@ -592,7 +590,8 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout) } do { - signalled = g_cond_timed_wait (comp->state_cond, comp->state_lock, timeval); + signalled = + g_cond_timed_wait (comp->state_cond, comp->state_lock.lock, timeval); } while (signalled && comp->last_error == OMX_ErrorNone && comp->pending_state != OMX_StateInvalid); @@ -615,7 +614,7 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout) } done: - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_unlock (&comp->state_lock); /* If we waited and timed out this component is unusable now */ if (!signalled) @@ -662,7 +661,7 @@ gst_omx_component_add_port (GstOMXComponent * comp, guint32 index) port->port_def = port_def; - port->port_lock = g_mutex_new (); + gst_omx_rec_mutex_init (&port->port_lock); port->port_cond = g_cond_new (); port->pending_buffers = g_queue_new (); port->flushing = TRUE; @@ -735,14 +734,14 @@ gst_omx_component_set_last_error (GstOMXComponent * comp, OMX_ERRORTYPE err) GST_ERROR_OBJECT (comp->parent, "Setting last error: %s (0x%08x)", gst_omx_error_to_string (err), err); - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_lock (&comp->state_lock); /* We only set the first error ever from which * we can't recover anymore. */ if (comp->last_error == OMX_ErrorNone) comp->last_error = err; g_cond_broadcast (comp->state_cond); - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_unlock (&comp->state_lock); /* Now notify all ports, no locking needed * here because the ports are allocated in the @@ -753,9 +752,9 @@ gst_omx_component_set_last_error (GstOMXComponent * comp, OMX_ERRORTYPE err) for (i = 0; i < n; i++) { GstOMXPort *tmp = g_ptr_array_index (comp->ports, i); - g_mutex_lock (tmp->port_lock); + gst_omx_rec_mutex_lock (&tmp->port_lock); g_cond_broadcast (tmp->port_cond); - g_mutex_unlock (tmp->port_lock); + gst_omx_rec_mutex_unlock (&tmp->port_lock); } } @@ -766,9 +765,9 @@ gst_omx_component_get_last_error (GstOMXComponent * comp) g_return_val_if_fail (comp != NULL, OMX_ErrorUndefined); - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_lock (&comp->state_lock); err = comp->last_error; - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_unlock (&comp->state_lock); GST_DEBUG_OBJECT (comp->parent, "Returning last error: %s (0x%08x)", gst_omx_error_to_string (err), err); @@ -882,7 +881,7 @@ gst_omx_port_update_port_definition (GstOMXPort * port, comp = port->comp; - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); if (port_def) err = gst_omx_component_set_parameter (comp, OMX_IndexParamPortDefinition, @@ -893,7 +892,7 @@ gst_omx_port_update_port_definition (GstOMXPort * port, GST_DEBUG_OBJECT (comp->parent, "Updated port %u definition: %s (0x%08x)", port->index, gst_omx_error_to_string (err), err); - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); return (err == OMX_ErrorNone); } @@ -915,7 +914,7 @@ gst_omx_port_acquire_buffer (GstOMXPort * port, GstOMXBuffer ** buf) GST_DEBUG_OBJECT (comp->parent, "Acquiring buffer from port %u", port->index); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); retry: @@ -940,16 +939,16 @@ retry: */ if (port->port_def.eDir == OMX_DirInput) { if (g_atomic_int_get (&comp->have_pending_reconfigure_outports)) { - g_mutex_unlock (port->port_lock); - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); + gst_omx_rec_mutex_lock (&comp->state_lock); while (g_atomic_int_get (&comp->have_pending_reconfigure_outports) && (err = comp->last_error) == OMX_ErrorNone && !port->flushing) { GST_DEBUG_OBJECT (comp->parent, "Waiting for output ports to reconfigure"); - g_cond_wait (comp->state_cond, comp->state_lock); + g_cond_wait (comp->state_cond, comp->state_lock.lock); } - g_mutex_unlock (comp->state_lock); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_unlock (&comp->state_lock); + gst_omx_rec_mutex_lock (&port->port_lock); goto retry; } @@ -1004,7 +1003,7 @@ retry: */ if (g_queue_is_empty (port->pending_buffers)) { GST_DEBUG_OBJECT (comp->parent, "Queue of port %u is empty", port->index); - g_cond_wait (port->port_cond, port->port_lock); + g_cond_wait (port->port_cond, port->port_lock.lock); } else { GST_DEBUG_OBJECT (comp->parent, "Port %u has pending buffers", port->pending_buffers); @@ -1017,7 +1016,7 @@ retry: goto retry; done: - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); if (_buf) { g_assert (_buf == _buf->omx_buf->pAppPrivate); @@ -1045,7 +1044,7 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf) GST_DEBUG_OBJECT (comp->parent, "Releasing buffer %p (%p) to port %u", buf, buf->omx_buf->pBuffer, port->index); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); if (port->port_def.eDir == OMX_DirInput) { /* Reset all flags, some implementations don't @@ -1076,17 +1075,20 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf) /* FIXME: What if the settings cookies don't match? */ buf->used = TRUE; + + gst_omx_rec_mutex_begin_recursion (&port->port_lock); if (port->port_def.eDir == OMX_DirInput) { err = OMX_EmptyThisBuffer (comp->handle, buf->omx_buf); } else { err = OMX_FillThisBuffer (comp->handle, buf->omx_buf); } + gst_omx_rec_mutex_end_recursion (&port->port_lock); GST_DEBUG_OBJECT (comp->parent, "Released buffer %p to port %u: %s (0x%08x)", buf, port->index, gst_omx_error_to_string (err), err); done: - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); return err; } @@ -1103,7 +1105,7 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) GST_DEBUG_OBJECT (comp->parent, "Setting port %d to %sflushing", port->index, (flush ? "" : "not ")); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); if (! !flush == ! !port->flushing) { GST_DEBUG_OBJECT (comp->parent, "Port %u was %sflushing already", port->index, (flush ? "" : "not ")); @@ -1116,17 +1118,17 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) goto done; } - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_lock (&comp->state_lock); if (comp->state != OMX_StateIdle && comp->state != OMX_StateExecuting) { GST_DEBUG_OBJECT (comp->parent, "Component is in wrong state: %d", comp->state); err = OMX_ErrorUndefined; - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_unlock (&comp->state_lock); goto done; } - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_unlock (&comp->state_lock); port->flushing = flush; if (flush) { @@ -1143,17 +1145,16 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) * additionally check if the condition they're waiting * for is true after waking up. */ - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_lock (&comp->state_lock); g_cond_broadcast (comp->state_cond); - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_unlock (&comp->state_lock); /* Now flush the port */ port->flushed = FALSE; - /* Unlock because this could call one of the callbacks which - * also take the lock */ - g_mutex_unlock (port->port_lock); + + gst_omx_rec_mutex_begin_recursion (&port->port_lock); err = OMX_SendCommand (comp->handle, OMX_CommandFlush, port->index, NULL); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_end_recursion (&port->port_lock); if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, @@ -1186,7 +1187,8 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) last_error = OMX_ErrorNone; while (signalled && last_error == OMX_ErrorNone && !port->flushed && port->buffers->len > g_queue_get_length (port->pending_buffers)) { - signalled = g_cond_timed_wait (port->port_cond, port->port_lock, timeval); + signalled = + g_cond_timed_wait (port->port_cond, port->port_lock.lock, timeval); last_error = gst_omx_component_get_last_error (comp); } @@ -1221,9 +1223,11 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) * valid anymore after the buffer was consumed */ buf->omx_buf->nFlags = 0; - g_mutex_unlock (port->port_lock); + + gst_omx_rec_mutex_begin_recursion (&port->port_lock); err = OMX_FillThisBuffer (comp->handle, buf->omx_buf); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_end_recursion (&port->port_lock); + if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Failed to pass buffer %p (%p) to port %u: %s (0x%08x)", buf, @@ -1240,7 +1244,7 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) done: GST_DEBUG_OBJECT (comp->parent, "Set port %u to %sflushing: %s (0x%08x)", port->index, (flush ? "" : "not "), gst_omx_error_to_string (err), err); - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); return err; @@ -1250,9 +1254,9 @@ error: * set_last_error() needs all port locks. * This is safe here because we're just going * to error out anyway */ - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); gst_omx_component_set_last_error (comp, err); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); goto done; } } @@ -1267,9 +1271,9 @@ gst_omx_port_is_flushing (GstOMXPort * port) comp = port->comp; - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); flushing = port->flushing; - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing: %d", port->index, flushing); @@ -1338,9 +1342,12 @@ gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port) buf->settings_cookie = port->settings_cookie; g_ptr_array_add (port->buffers, buf); + gst_omx_rec_mutex_begin_recursion (&port->port_lock); err = OMX_AllocateBuffer (comp->handle, &buf->omx_buf, port->index, buf, port->port_def.nBufferSize); + gst_omx_rec_mutex_end_recursion (&port->port_lock); + if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Failed to allocate buffer for port %u: %s (0x%08x)", port->index, @@ -1369,9 +1376,9 @@ error: * set_last_error() needs all port locks. * This is safe here because we're just going * to error out anyway */ - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); gst_omx_component_set_last_error (comp, err); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); goto done; } } @@ -1383,9 +1390,9 @@ gst_omx_port_allocate_buffers (GstOMXPort * port) g_return_val_if_fail (port != NULL, OMX_ErrorUndefined); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); err = gst_omx_port_allocate_buffers_unlocked (port); - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); return err; } @@ -1439,7 +1446,11 @@ gst_omx_port_deallocate_buffers_unlocked (GstOMXPort * port) buf->omx_buf->pAppPrivate = NULL; GST_DEBUG_OBJECT (comp->parent, "Deallocating buffer %p (%p)", buf, buf->omx_buf->pBuffer); + + gst_omx_rec_mutex_begin_recursion (&port->port_lock); tmp = OMX_FreeBuffer (comp->handle, port->index, buf->omx_buf); + gst_omx_rec_mutex_end_recursion (&port->port_lock); + if (tmp != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Failed to deallocate buffer %d of port %u: %s (0x%08x)", i, @@ -1473,9 +1484,9 @@ gst_omx_port_deallocate_buffers (GstOMXPort * port) g_return_val_if_fail (port != NULL, OMX_ErrorUndefined); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); err = gst_omx_port_deallocate_buffers_unlocked (port); - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); return err; } @@ -1516,7 +1527,7 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) port->flushing = TRUE; } - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_begin_recursion (&port->port_lock); if (enabled) err = OMX_SendCommand (comp->handle, OMX_CommandPortEnable, port->index, @@ -1525,7 +1536,7 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) err = OMX_SendCommand (comp->handle, OMX_CommandPortDisable, port->index, NULL); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_end_recursion (&port->port_lock); if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, @@ -1555,7 +1566,8 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) last_error = OMX_ErrorNone; while (signalled && last_error == OMX_ErrorNone && (port->buffers && port->buffers->len > g_queue_get_length (port->pending_buffers))) { - signalled = g_cond_timed_wait (port->port_cond, port->port_lock, timeval); + signalled = + g_cond_timed_wait (port->port_cond, port->port_lock.lock, timeval); last_error = gst_omx_component_get_last_error (comp); } @@ -1594,7 +1606,8 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) &port->port_def); while (signalled && last_error == OMX_ErrorNone && (! !port->port_def.bEnabled != ! !enabled || !port->enabled_changed)) { - signalled = g_cond_timed_wait (port->port_cond, port->port_lock, timeval); + signalled = + g_cond_timed_wait (port->port_cond, port->port_lock.lock, timeval); last_error = gst_omx_component_get_last_error (comp); gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition, &port->port_def); @@ -1634,7 +1647,11 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) * valid anymore after the buffer was consumed */ buf->omx_buf->nFlags = 0; + + gst_omx_rec_mutex_begin_recursion (&port->port_lock); err = OMX_FillThisBuffer (comp->handle, buf->omx_buf); + gst_omx_rec_mutex_end_recursion (&port->port_lock); + if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Failed to pass buffer %p (%p) to port %u: %s (0x%08x)", buf, @@ -1661,9 +1678,9 @@ error: * set_last_error() needs all port locks. * This is safe here because we're just going * to error out anyway */ - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); gst_omx_component_set_last_error (comp, err); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); goto done; } } @@ -1675,9 +1692,9 @@ gst_omx_port_set_enabled (GstOMXPort * port, gboolean enabled) g_return_val_if_fail (port != NULL, OMX_ErrorUndefined); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); err = gst_omx_port_set_enabled_unlocked (port, enabled); - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); return err; } @@ -1692,11 +1709,11 @@ gst_omx_port_is_enabled (GstOMXPort * port) comp = port->comp; - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition, &port->port_def); enabled = port->port_def.bEnabled; - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); GST_DEBUG_OBJECT (comp->parent, "Port %u is enabled: %d", port->index, enabled); @@ -1716,7 +1733,7 @@ gst_omx_port_reconfigure (GstOMXPort * port) GST_DEBUG_OBJECT (comp->parent, "Reconfiguring port %u", port->index); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); if (!port->settings_changed) goto done; @@ -1744,7 +1761,7 @@ gst_omx_port_reconfigure (GstOMXPort * port) if (port->port_def.eDir == OMX_DirOutput) { GList *l; - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_lock (&comp->state_lock); for (l = comp->pending_reconfigure_outports; l; l = l->next) { if (l->data == (gpointer) port) { comp->pending_reconfigure_outports = @@ -1756,14 +1773,14 @@ gst_omx_port_reconfigure (GstOMXPort * port) g_atomic_int_set (&comp->have_pending_reconfigure_outports, 0); g_cond_broadcast (comp->state_cond); } - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_unlock (&comp->state_lock); } done: GST_DEBUG_OBJECT (comp->parent, "Reconfigured port %u: %s (0x%08x)", port->index, gst_omx_error_to_string (err), err); - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); return err; } @@ -1780,7 +1797,7 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start) GST_DEBUG_OBJECT (comp->parent, "Manual reconfigure of port %u %s", port->index, (start ? "start" : "stsop")); - g_mutex_lock (port->port_lock); + gst_omx_rec_mutex_lock (&port->port_lock); if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) goto done; @@ -1794,7 +1811,7 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start) GList *l; if (start) { - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_lock (&comp->state_lock); for (l = comp->pending_reconfigure_outports; l; l = l->next) { if (l->data == (gpointer) port) break; @@ -1805,9 +1822,9 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start) g_list_prepend (comp->pending_reconfigure_outports, port); g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1); } - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_unlock (&comp->state_lock); } else { - g_mutex_lock (comp->state_lock); + gst_omx_rec_mutex_lock (&comp->state_lock); for (l = comp->pending_reconfigure_outports; l; l = l->next) { if (l->data == (gpointer) port) { comp->pending_reconfigure_outports = @@ -1819,13 +1836,13 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start) g_atomic_int_set (&comp->have_pending_reconfigure_outports, 0); g_cond_broadcast (comp->state_cond); } - g_mutex_unlock (comp->state_lock); + gst_omx_rec_mutex_unlock (&comp->state_lock); } } done: - g_mutex_unlock (port->port_lock); + gst_omx_rec_mutex_unlock (&port->port_lock); GST_DEBUG_OBJECT (comp->parent, "Manual reconfigure of port %u: %s (0x%08x)", port->index, gst_omx_error_to_string (err), err); diff --git a/omx/gstomx.h b/omx/gstomx.h index df91ca1910..15026d306d 100644 --- a/omx/gstomx.h +++ b/omx/gstomx.h @@ -27,6 +27,8 @@ #include #include +#include "gstomxrecmutex.h" + G_BEGIN_DECLS #define GST_OMX_INIT_STRUCT(st) G_STMT_START { \ @@ -138,7 +140,7 @@ struct _GstOMXPort { * Note: This lock must always be taken before * the component's state lock if both are needed! */ - GMutex *port_lock; + GstOMXRecMutex port_lock; GCond *port_cond; OMX_PARAM_PORTDEFINITIONTYPE port_def; GPtrArray *buffers; /* Contains GstOMXBuffer* */ @@ -171,7 +173,7 @@ struct _GstOMXComponent { * pending_reconfigure_outports. * Signalled if one of them changes */ - GMutex *state_lock; + GstOMXRecMutex state_lock; GCond *state_cond; OMX_STATETYPE state; /* OMX_StateInvalid if no pending state */ diff --git a/omx/gstomxaudioenc.c b/omx/gstomxaudioenc.c index de57edffa6..d87e6873fc 100644 --- a/omx/gstomxaudioenc.c +++ b/omx/gstomxaudioenc.c @@ -885,10 +885,10 @@ gst_omx_audio_enc_sink_event (GstAudioEncoder * encoder, GstEvent * event) GST_WARNING_OBJECT (self, "Component does not support empty EOS buffers"); /* Insert a NULL into the queue to signal EOS */ - g_mutex_lock (self->out_port->port_lock); + gst_omx_rec_mutex_lock (&self->out_port->port_lock); g_queue_push_tail (self->out_port->pending_buffers, NULL); g_cond_broadcast (self->out_port->port_cond); - g_mutex_unlock (self->out_port->port_lock); + gst_omx_rec_mutex_unlock (&self->out_port->port_lock); return TRUE; } diff --git a/omx/gstomxrecmutex.c b/omx/gstomxrecmutex.c new file mode 100644 index 0000000000..612668bef4 --- /dev/null +++ b/omx/gstomxrecmutex.c @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2012, Collabora Ltd. + * Author: George Kiagiadakis + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstomxrecmutex.h" + +void +gst_omx_rec_mutex_init (GstOMXRecMutex * mutex) +{ + mutex->lock = g_mutex_new (); + mutex->recursion_lock = g_mutex_new (); + mutex->recursion_allowed = FALSE; +} + +void +gst_omx_rec_mutex_clear (GstOMXRecMutex * mutex) +{ + g_mutex_free (mutex->lock); + g_mutex_free (mutex->recursion_lock); +} + +void +gst_omx_rec_mutex_lock (GstOMXRecMutex * mutex) +{ + g_mutex_lock (mutex->lock); +} + +void +gst_omx_rec_mutex_unlock (GstOMXRecMutex * mutex) +{ + g_mutex_unlock (mutex->lock); +} + +/* must be called with mutex->lock taken */ +void +gst_omx_rec_mutex_begin_recursion (GstOMXRecMutex * mutex) +{ + g_mutex_lock (mutex->recursion_lock); + g_assert (mutex->recursion_allowed == FALSE); + mutex->recursion_allowed = TRUE; + g_mutex_unlock (mutex->recursion_lock); +} + +/* must be called with mutex->lock taken */ +void +gst_omx_rec_mutex_end_recursion (GstOMXRecMutex * mutex) +{ + g_mutex_lock (mutex->recursion_lock); + g_assert (mutex->recursion_allowed == TRUE); + mutex->recursion_allowed = FALSE; + g_mutex_unlock (mutex->recursion_lock); +} + +void +gst_omx_rec_mutex_recursive_lock (GstOMXRecMutex * mutex) +{ + g_mutex_lock (mutex->recursion_lock); + if (!mutex->recursion_allowed) { + /* no recursion allowed, lock the proper mutex */ + g_mutex_unlock (mutex->recursion_lock); + g_mutex_lock (mutex->lock); + } +} + +void +gst_omx_rec_mutex_recursive_unlock (GstOMXRecMutex * mutex) +{ + /* It is safe to check recursion_allowed here because + * we hold at least one of the two locks and + * either lock protects it from being changed. + */ + if (mutex->recursion_allowed) { + g_mutex_unlock (mutex->recursion_lock); + } else { + g_mutex_unlock (mutex->lock); + } +} diff --git a/omx/gstomxrecmutex.h b/omx/gstomxrecmutex.h new file mode 100644 index 0000000000..b1eb37331f --- /dev/null +++ b/omx/gstomxrecmutex.h @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2012, Collabora Ltd. + * Author: George Kiagiadakis + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#ifndef __GST_OMX_REC_MUTEX_H__ +#define __GST_OMX_REC_MUTEX_H__ + +#include + +G_BEGIN_DECLS + +/* + * This is a recursive mutex implementation that serves a very specific + * purpose; it is used to allow OpenMAX callbacks to be run in the context + * of some OpenMAX function call while the calling function is holding the + * master lock. + * + * According to the OpenMAX specification, we have 2 possible ways that + * callbacks may be called. First, we have out-of-context calls, which means + * that callbacks are called from a different thread at any point in time. + * In this case, callbacks must take the appropriate lock to protect the data + * that they are changing. Second, we have in-context calls, which means + * that callbacks are called when we call some OpenMAX function, before this + * function returns. In this case, we need to allow the callback to run + * without taking any locks that the caller of the OpenMAX function is holding. + * + * This can be solved by a recusrive mutex. However, a normal GRecMutex is + * not enough because it allows being locked multiple times only from + * the same thread. Unfortunatly, what we see in Broadcom's implementation, + * for instance, OpenMAX callbacks may be in-context, but from a different + * thread. This is achieved like this: + * + * - OMX_Foo is called + * - OMX_Foo waits on a condition + * - The callback is executed in a different thread + * - When the callback returns, its calling function + * signals the condition that OMX_Foo waits on + * - OMX_Foo wakes up and returns + * + * This recursive mutex implementation attempts to fix this problem. + * This is achieved like this: All functions lock this mutex normally + * using gst_omx_rec_mutex_lock() / _unlock(). These functions + * effectively lock the master mutex and they are identical in behavior + * with g_mutex_lock() / _unlock(). When a function that has already + * locked this mutex is about to call some OpenMAX function, it must + * call gst_omx_rec_mutex_begin_recursion() to indicate that recursive + * locking is now allowed, and similarly, call gst_omx_rec_mutex_end_recursion() + * after the OpenMAX function has returned to indicate that no recursive + * locking is allowed from this point on. Callbacks should lock the + * mutex using gst_omx_rec_mutex_recursive_lock() / _recursive_unlock(). + * These two functions, depending on whether recursion is allowed + * will take/release either the master lock or the recursion_lock. + * Effectively, this allows callbacks to run in the context any code between + * calls to gst_omx_rec_mutex_begin_recursion() / _end_recursion(). + * + * Note that this doesn't prevent out-of-context callback executions + * to be run at that point, but due to the fact that _end_recursion() + * also locks the recursion_lock, it is at least guaranteed that they + * will have finished their execution before _end_recursion() returns. + */ +typedef struct _GstOMXRecMutex GstOMXRecMutex; + +struct _GstOMXRecMutex { + /* The master lock */ + GMutex *lock; + + /* This lock is taken when recursing. + * The master lock must always be taken before this one, + * by the caller of _begin_recursion(). + */ + GMutex *recursion_lock; + + /* Indicates whether recursion is allowed. + * When it is allowed, _recursive_lock() takes + * the recursion_lock instead of the master lock. + * This variable is protected by both locks. + */ + volatile gboolean recursion_allowed; +}; + +void gst_omx_rec_mutex_init (GstOMXRecMutex * mutex); +void gst_omx_rec_mutex_clear (GstOMXRecMutex * mutex); + +void gst_omx_rec_mutex_lock (GstOMXRecMutex * mutex); +void gst_omx_rec_mutex_unlock (GstOMXRecMutex * mutex); + +void gst_omx_rec_mutex_begin_recursion (GstOMXRecMutex * mutex); +void gst_omx_rec_mutex_end_recursion (GstOMXRecMutex * mutex); + +void gst_omx_rec_mutex_recursive_lock (GstOMXRecMutex * mutex); +void gst_omx_rec_mutex_recursive_unlock (GstOMXRecMutex * mutex); + +G_END_DECLS + +#endif /* __GST_OMX_REC_MUTEX_H__ */ diff --git a/omx/gstomxvideodec.c b/omx/gstomxvideodec.c index 9efe90aed7..45d2b225bf 100644 --- a/omx/gstomxvideodec.c +++ b/omx/gstomxvideodec.c @@ -1342,10 +1342,10 @@ gst_omx_video_dec_finish (GstBaseVideoDecoder * decoder) GST_WARNING_OBJECT (self, "Component does not support empty EOS buffers"); /* Insert a NULL into the queue to signal EOS */ - g_mutex_lock (self->out_port->port_lock); + gst_omx_rec_mutex_lock (&self->out_port->port_lock); g_queue_push_tail (self->out_port->pending_buffers, NULL); g_cond_broadcast (self->out_port->port_cond); - g_mutex_unlock (self->out_port->port_lock); + gst_omx_rec_mutex_unlock (&self->out_port->port_lock); return GST_BASE_VIDEO_DECODER_FLOW_DROPPED; } diff --git a/omx/gstomxvideoenc.c b/omx/gstomxvideoenc.c index 78df4c501b..c12697d26c 100644 --- a/omx/gstomxvideoenc.c +++ b/omx/gstomxvideoenc.c @@ -1392,10 +1392,10 @@ gst_omx_video_enc_finish (GstBaseVideoEncoder * encoder) GST_WARNING_OBJECT (self, "Component does not support empty EOS buffers"); /* Insert a NULL into the queue to signal EOS */ - g_mutex_lock (self->out_port->port_lock); + gst_omx_rec_mutex_lock (&self->out_port->port_lock); g_queue_push_tail (self->out_port->pending_buffers, NULL); g_cond_broadcast (self->out_port->port_cond); - g_mutex_unlock (self->out_port->port_lock); + gst_omx_rec_mutex_unlock (&self->out_port->port_lock); return GST_BASE_VIDEO_ENCODER_FLOW_DROPPED; }