diff --git a/omx/Makefile.am b/omx/Makefile.am index 02961284ff..181ce0e521 100644 --- a/omx/Makefile.am +++ b/omx/Makefile.am @@ -12,8 +12,7 @@ libgstopenmax_la_SOURCES = \ gstomxmpeg4videoenc.c \ gstomxh264enc.c \ gstomxh263enc.c \ - gstomxaacenc.c \ - gstomxrecmutex.c + gstomxaacenc.c noinst_HEADERS = \ gstomx.h \ @@ -27,8 +26,7 @@ noinst_HEADERS = \ gstomxmpeg4videoenc.h \ gstomxh264enc.h \ gstomxh263enc.h \ - gstomxaacenc.h \ - gstomxrecmutex.h + gstomxaacenc.h libgstopenmax_la_CFLAGS = \ -DGST_USE_UNSTABLE_API=1 \ diff --git a/omx/gstomx.c b/omx/gstomx.c index 1d65f7c31a..8c30feccb4 100644 --- a/omx/gstomx.c +++ b/omx/gstomx.c @@ -1,6 +1,8 @@ /* * Copyright (C) 2011, Hewlett-Packard Development Company, L.P. * Author: Sebastian Dröge , Collabora Ltd. + * Copyright (C) 2013, Collabora Ltd. + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -174,6 +176,187 @@ gst_omx_core_release (GstOMXCore * core) G_UNLOCK (core_handles); } +/* NOTE: comp->messages_lock will be used */ +static void +gst_omx_component_flush_messages (GstOMXComponent * comp) +{ + GstOMXMessage *msg; + + g_mutex_lock (&comp->messages_lock); + while ((msg = g_queue_pop_head (&comp->messages))) { + g_slice_free (GstOMXMessage, msg); + } + g_mutex_unlock (&comp->messages_lock); +} + +/* NOTE: Call with comp->lock, comp->messages_lock will be used */ +static void +gst_omx_component_handle_messages (GstOMXComponent * comp) +{ + GstOMXMessage *msg; + + g_mutex_lock (&comp->messages_lock); + + while ((msg = g_queue_pop_head (&comp->messages))) { + switch (msg->type) { + case GST_OMX_MESSAGE_STATE_SET:{ + GST_DEBUG_OBJECT (comp->parent, "State change to %d finished", + msg->content.state_set.state); + comp->state = msg->content.state_set.state; + if (comp->state == comp->pending_state) + comp->pending_state = OMX_StateInvalid; + break; + } + case GST_OMX_MESSAGE_FLUSH:{ + GstOMXPort *port = NULL; + OMX_U32 index = msg->content.flush.port; + + port = gst_omx_component_get_port (comp, index); + if (!port) + break; + + GST_DEBUG_OBJECT (comp->parent, "Port %u flushed", port->index); + + if (port->flushing) { + port->flushed = TRUE; + } else { + GST_ERROR_OBJECT (comp->parent, "Port %u was not flushing", + port->index); + } + + break; + } + case GST_OMX_MESSAGE_ERROR:{ + OMX_ERRORTYPE error = msg->content.error.error; + + if (error == OMX_ErrorNone) + break; + + GST_ERROR_OBJECT (comp->parent, "Got error: %s (0x%08x)", + gst_omx_error_to_string (error), error); + + /* We only set the first error ever from which + * we can't recover anymore. + */ + if (comp->last_error == OMX_ErrorNone) + comp->last_error = error; + g_cond_broadcast (&comp->messages_cond); + + break; + } + case GST_OMX_MESSAGE_PORT_ENABLE:{ + GstOMXPort *port = NULL; + OMX_U32 index = msg->content.port_enable.port; + OMX_BOOL enable = msg->content.port_enable.enable; + + port = gst_omx_component_get_port (comp, index); + if (!port) + break; + + GST_DEBUG_OBJECT (comp->parent, "Port %u %s", port->index, + (enable ? "enabled" : "disabled")); + + port->enabled_changed = TRUE; + break; + } + case GST_OMX_MESSAGE_PORT_SETTINGS_CHANGED:{ + gint i, n; + OMX_U32 index = msg->content.port_settings_changed.port; + GList *outports = NULL, *l, *k; + + GST_DEBUG_OBJECT (comp->parent, "Settings changed (port %u)", index); + + /* FIXME: This probably can be done better */ + + /* Now update the ports' states */ + n = (comp->ports ? comp->ports->len : 0); + for (i = 0; i < n; i++) { + GstOMXPort *port = g_ptr_array_index (comp->ports, i); + + if (index == OMX_ALL || index == port->index) { + port->settings_cookie++; + if (port->port_def.eDir == OMX_DirOutput) + outports = g_list_prepend (outports, port); + } + } + + for (k = outports; k; k = k->next) { + gboolean found = FALSE; + + for (l = comp->pending_reconfigure_outports; l; l = l->next) { + if (l->data == k->data) { + found = TRUE; + break; + } + } + + if (!found) + comp->pending_reconfigure_outports = + g_list_prepend (comp->pending_reconfigure_outports, k->data); + } + + if (comp->pending_reconfigure_outports) + g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1); + + g_list_free (outports); + + break; + } + case GST_OMX_MESSAGE_BUFFER_DONE:{ + GstOMXBuffer *buf = msg->content.buffer_done.buffer->pAppPrivate; + GstOMXPort *port; + GstOMXComponent *comp; + + port = buf->port; + comp = port->comp; + + if (msg->content.buffer_done.empty) { + /* Input buffer is empty again and can be used to contain new input */ + GST_DEBUG_OBJECT (comp->parent, "Port %u emptied buffer %p (%p)", + port->index, buf, buf->omx_buf->pBuffer); + + /* XXX: Some OMX implementations don't reset nOffset + * when the complete buffer is emptied but instead + * only reset nFilledLen. We reset nOffset to 0 + * if nFilledLen == 0, which is safe to do because + * the offset *must* be 0 if the buffer is not + * filled at all. + * + * Seen in QCOM's OMX implementation. + */ + if (buf->omx_buf->nFilledLen == 0) + buf->omx_buf->nOffset = 0; + + /* Reset all flags, some implementations don't + * reset them themselves and the flags are not + * valid anymore after the buffer was consumed + */ + buf->omx_buf->nFlags = 0; + } else { + /* Output buffer contains output now or + * the port was flushed */ + GST_DEBUG_OBJECT (comp->parent, "Port %u filled buffer %p (%p)", + port->index, buf, buf->omx_buf->pBuffer); + } + + buf->used = FALSE; + + g_queue_push_tail (&port->pending_buffers, buf); + + break; + } + default:{ + g_assert_not_reached (); + break; + } + } + + g_slice_free (GstOMXMessage, msg); + } + + g_mutex_unlock (&comp->messages_lock); +} + static OMX_ERRORTYPE EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent, OMX_U32 nData1, OMX_U32 nData2, OMX_PTR pEventData) @@ -189,64 +372,49 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent, switch (cmd) { case OMX_CommandStateSet:{ - /* Notify everything that waits for - * a state change to be finished */ + GstOMXMessage *msg = g_slice_new (GstOMXMessage); + + msg->type = GST_OMX_MESSAGE_STATE_SET; + msg->content.state_set.state = nData2; + GST_DEBUG_OBJECT (comp->parent, "State change to %d finished", - nData2); - gst_omx_rec_mutex_recursive_lock (&comp->state_lock); - comp->state = (OMX_STATETYPE) nData2; - if (comp->state == comp->pending_state) - comp->pending_state = OMX_StateInvalid; - g_cond_broadcast (&comp->state_cond); - gst_omx_rec_mutex_recursive_unlock (&comp->state_lock); + msg->content.state_set.state); + + g_mutex_lock (&comp->messages_lock); + g_queue_push_tail (&comp->messages, msg); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); break; } case OMX_CommandFlush:{ - GstOMXPort *port = NULL; - OMX_U32 index = nData2; + GstOMXMessage *msg = g_slice_new (GstOMXMessage); - port = gst_omx_component_get_port (comp, index); - if (!port) - break; + msg->type = GST_OMX_MESSAGE_FLUSH; + msg->content.flush.port = nData2; + GST_DEBUG_OBJECT (comp->parent, "Port %u flushed", + msg->content.flush.port); - GST_DEBUG_OBJECT (comp->parent, "Port %u flushed", port->index); - - /* Now notify gst_omx_port_set_flushing() - * that the port is really flushed now and - * we can continue - */ - gst_omx_rec_mutex_recursive_lock (&port->port_lock); - /* FIXME: If this is ever called when the port - * was not set to flushing something went - * wrong but it happens for some reason. - */ - if (port->flushing) { - port->flushed = TRUE; - g_cond_broadcast (&port->port_cond); - } else { - GST_ERROR_OBJECT (comp->parent, "Port %u was not flushing", - port->index); - } - gst_omx_rec_mutex_recursive_unlock (&port->port_lock); + g_mutex_lock (&comp->messages_lock); + g_queue_push_tail (&comp->messages, msg); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); break; } case OMX_CommandPortEnable: case OMX_CommandPortDisable:{ - GstOMXPort *port = NULL; - OMX_U32 index = nData2; + GstOMXMessage *msg = g_slice_new (GstOMXMessage); - port = gst_omx_component_get_port (comp, index); - if (!port) - break; - - GST_DEBUG_OBJECT (comp->parent, "Port %u %s", port->index, - (cmd == OMX_CommandPortEnable ? "enabled" : "disabled")); - - gst_omx_rec_mutex_recursive_lock (&port->port_lock); - port->enabled_changed = TRUE; - g_cond_broadcast (&port->port_cond); - gst_omx_rec_mutex_recursive_unlock (&port->port_lock); + msg->type = GST_OMX_MESSAGE_PORT_ENABLE; + msg->content.port_enable.port = nData2; + msg->content.port_enable.enable = (cmd == OMX_CommandPortEnable); + GST_DEBUG_OBJECT (comp->parent, "Port %u %s", + msg->content.port_enable.port, + (msg->content.port_enable.enable ? "enabled" : "disabled")); + g_mutex_lock (&comp->messages_lock); + g_queue_push_tail (&comp->messages, msg); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); break; } default: @@ -256,76 +424,54 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent, } case OMX_EventError: { - OMX_ERRORTYPE err = nData1; + GstOMXMessage *msg; - if (err == OMX_ErrorNone) + /* Yes, this really happens... */ + if (nData1 == OMX_ErrorNone) break; + msg = g_slice_new (GstOMXMessage); + + msg->type = GST_OMX_MESSAGE_ERROR; + msg->content.error.error = nData1; GST_ERROR_OBJECT (comp->parent, "Got error: %s (0x%08x)", - gst_omx_error_to_string (err), err); - - /* Error events are always fatal */ - gst_omx_component_set_last_error (comp, err); + gst_omx_error_to_string (msg->content.error.error), + msg->content.error.error); + g_mutex_lock (&comp->messages_lock); + g_queue_push_tail (&comp->messages, msg); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); break; } case OMX_EventPortSettingsChanged: { - gint i, n; - guint32 port_index; - GList *outports = NULL, *l, *k; + GstOMXMessage *msg = g_slice_new (GstOMXMessage); + OMX_U32 index; if (!(comp->hacks & GST_OMX_HACK_EVENT_PORT_SETTINGS_CHANGED_NDATA_PARAMETER_SWAP)) { - port_index = nData1; + index = nData1; } else { - port_index = nData2; + index = nData2; } - if (port_index == 0 + + if (index == 0 && (comp->hacks & GST_OMX_HACK_EVENT_PORT_SETTINGS_CHANGED_PORT_0_TO_1)) - port_index = 1; + index = 1; + + msg->type = GST_OMX_MESSAGE_PORT_SETTINGS_CHANGED; + msg->content.port_settings_changed.port = index; GST_DEBUG_OBJECT (comp->parent, "Settings changed (port index: %d)", - port_index); + msg->content.port_settings_changed.port); - /* Now update the ports' states */ - n = (comp->ports ? comp->ports->len : 0); - for (i = 0; i < n; i++) { - GstOMXPort *port = g_ptr_array_index (comp->ports, i); - - gst_omx_rec_mutex_recursive_lock (&port->port_lock); - if (port_index == OMX_ALL || port_index == port->index) { - port->settings_cookie++; - if (port->port_def.eDir == OMX_DirOutput) - outports = g_list_prepend (outports, port); - g_cond_broadcast (&port->port_cond); - } - gst_omx_rec_mutex_recursive_unlock (&port->port_lock); - } - - gst_omx_rec_mutex_recursive_lock (&comp->state_lock); - for (k = outports; k; k = k->next) { - gboolean found = FALSE; - - for (l = comp->pending_reconfigure_outports; l; l = l->next) { - if (l->data == k->data) { - found = TRUE; - break; - } - } - - if (!found) - comp->pending_reconfigure_outports = - g_list_prepend (comp->pending_reconfigure_outports, k->data); - } - - if (comp->pending_reconfigure_outports) - g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1); - gst_omx_rec_mutex_recursive_unlock (&comp->state_lock); - - g_list_free (outports); + g_mutex_lock (&comp->messages_lock); + g_queue_push_tail (&comp->messages, msg); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); break; } @@ -342,48 +488,34 @@ static OMX_ERRORTYPE EmptyBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_BUFFERHEADERTYPE * pBuffer) { - GstOMXBuffer *buf = pBuffer->pAppPrivate; - GstOMXPort *port; + GstOMXBuffer *buf; GstOMXComponent *comp; + GstOMXMessage *msg; - if (buf == NULL) { + buf = pBuffer->pAppPrivate; + if (!buf) { GST_ERROR ("Have unknown or deallocated buffer %p", pBuffer); return OMX_ErrorNone; } - port = buf->port; - comp = port->comp; - g_assert (buf->omx_buf == pBuffer); - /* Input buffer is empty again and can - * be used to contain new input */ - gst_omx_rec_mutex_recursive_lock (&port->port_lock); + comp = buf->port->comp; + + msg = g_slice_new (GstOMXMessage); + msg->type = GST_OMX_MESSAGE_BUFFER_DONE; + msg->content.buffer_done.component = hComponent; + msg->content.buffer_done.app_data = pAppData; + msg->content.buffer_done.buffer = pBuffer; + msg->content.buffer_done.empty = OMX_TRUE; + GST_DEBUG_OBJECT (comp->parent, "Port %u emptied buffer %p (%p)", - port->index, buf, buf->omx_buf->pBuffer); - buf->used = FALSE; + buf->port->index, buf, buf->omx_buf->pBuffer); - /* XXX: Some OMX implementations don't reset nOffset - * when the complete buffer is emptied but instead - * only reset nFilledLen. We reset nOffset to 0 - * if nFilledLen == 0, which is safe to do because - * the offset *must* be 0 if the buffer is not - * filled at all. - * - * Seen in QCOM's OMX implementation. - */ - if (buf->omx_buf->nFilledLen == 0) - buf->omx_buf->nOffset = 0; - - /* Reset all flags, some implementations don't - * reset them themselves and the flags are not - * valid anymore after the buffer was consumed - */ - buf->omx_buf->nFlags = 0; - - g_queue_push_tail (port->pending_buffers, buf); - g_cond_broadcast (&port->port_cond); - gst_omx_rec_mutex_recursive_unlock (&port->port_lock); + g_mutex_lock (&comp->messages_lock); + g_queue_push_tail (&comp->messages, msg); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); return OMX_ErrorNone; } @@ -392,29 +524,34 @@ static OMX_ERRORTYPE FillBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_BUFFERHEADERTYPE * pBuffer) { - GstOMXBuffer *buf = pBuffer->pAppPrivate; - GstOMXPort *port; + GstOMXBuffer *buf; GstOMXComponent *comp; + GstOMXMessage *msg; - if (buf == NULL) { + buf = pBuffer->pAppPrivate; + if (!buf) { GST_ERROR ("Have unknown or deallocated buffer %p", pBuffer); return OMX_ErrorNone; } - port = buf->port; - comp = port->comp; - g_assert (buf->omx_buf == pBuffer); - /* Output buffer contains output now or - * the port was flushed */ - gst_omx_rec_mutex_recursive_lock (&port->port_lock); - GST_DEBUG_OBJECT (comp->parent, "Port %u filled buffer %p (%p)", port->index, - buf, buf->omx_buf->pBuffer); - buf->used = FALSE; - g_queue_push_tail (port->pending_buffers, buf); - g_cond_broadcast (&port->port_cond); - gst_omx_rec_mutex_recursive_unlock (&port->port_lock); + comp = buf->port->comp; + + msg = g_slice_new (GstOMXMessage); + msg->type = GST_OMX_MESSAGE_BUFFER_DONE; + msg->content.buffer_done.component = hComponent; + msg->content.buffer_done.app_data = pAppData; + msg->content.buffer_done.buffer = pBuffer; + msg->content.buffer_done.empty = OMX_FALSE; + + GST_DEBUG_OBJECT (comp->parent, "Port %u filled buffer %p (%p)", + buf->port->index, buf, buf->omx_buf->pBuffer); + + g_mutex_lock (&comp->messages_lock); + g_queue_push_tail (&comp->messages, msg); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); return OMX_ErrorNone; } @@ -422,6 +559,7 @@ FillBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, static OMX_CALLBACKTYPE callbacks = { EventHandler, EmptyBufferDone, FillBufferDone }; +/* NOTE: Uses comp->lock and comp->messages_lock */ GstOMXComponent * gst_omx_component_new (GstObject * parent, const GstOMXClassData * cdata) { @@ -457,8 +595,11 @@ gst_omx_component_new (GstObject * parent, const GstOMXClassData * cdata) comp->n_in_ports = 0; comp->n_out_ports = 0; - gst_omx_rec_mutex_init (&comp->state_lock); - g_cond_init (&comp->state_cond); + g_mutex_init (&comp->lock); + g_mutex_init (&comp->messages_lock); + g_cond_init (&comp->messages_cond); + + g_queue_init (&comp->messages); comp->pending_state = OMX_StateInvalid; comp->last_error = OMX_ErrorNone; @@ -486,9 +627,14 @@ gst_omx_component_new (GstObject * parent, const GstOMXClassData * cdata) OMX_GetState (comp->handle, &comp->state); + g_mutex_lock (&comp->lock); + gst_omx_component_handle_messages (comp); + g_mutex_unlock (&comp->lock); + return comp; } +/* NOTE: Uses comp->messages_lock */ void gst_omx_component_free (GstOMXComponent * comp) { @@ -504,32 +650,30 @@ gst_omx_component_free (GstOMXComponent * comp) GstOMXPort *port = g_ptr_array_index (comp->ports, i); gst_omx_port_deallocate_buffers (port); - - gst_omx_rec_mutex_clear (&port->port_lock); - g_cond_clear (&port->port_cond); - g_queue_free (port->pending_buffers); + g_assert (port->buffers == NULL); + g_assert (g_queue_get_length (&port->pending_buffers) == 0); g_slice_free (GstOMXPort, port); } -#if GLIB_CHECK_VERSION(2,22,0) g_ptr_array_unref (comp->ports); -#else - g_ptr_array_free (comp->ports, TRUE); -#endif comp->ports = NULL; } comp->core->free_handle (comp->handle); gst_omx_core_release (comp->core); - g_cond_clear (&comp->state_cond); - gst_omx_rec_mutex_clear (&comp->state_lock); + gst_omx_component_flush_messages (comp); + + g_cond_clear (&comp->messages_cond); + g_mutex_clear (&comp->messages_lock); + g_mutex_clear (&comp->lock); gst_object_unref (comp->parent); g_slice_free (GstOMXComponent, comp); } +/* NOTE: Uses comp->lock and comp->messages_lock */ OMX_ERRORTYPE gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state) { @@ -538,10 +682,14 @@ gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state) g_return_val_if_fail (comp != NULL, OMX_ErrorUndefined); - gst_omx_rec_mutex_lock_for_recursion (&comp->state_lock); + g_mutex_lock (&comp->lock); + + gst_omx_component_handle_messages (comp); + old_state = comp->state; GST_DEBUG_OBJECT (comp->parent, "Setting state from %d to %d", old_state, state); + if ((err = comp->last_error) != OMX_ErrorNone && state > old_state) { GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)", gst_omx_error_to_string (err), err); @@ -561,16 +709,18 @@ gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state) g_list_free (comp->pending_reconfigure_outports); comp->pending_reconfigure_outports = NULL; /* Notify all inports that are still waiting */ - g_cond_broadcast (&comp->state_cond); + g_mutex_lock (&comp->messages_lock); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); } - gst_omx_rec_mutex_begin_recursion (&comp->state_lock); err = OMX_SendCommand (comp->handle, OMX_CommandStateSet, state, NULL); - gst_omx_rec_mutex_end_recursion (&comp->state_lock); /* No need to check if anything has changed here */ done: - gst_omx_rec_mutex_unlock_for_recursion (&comp->state_lock); + + gst_omx_component_handle_messages (comp); + g_mutex_unlock (&comp->lock); if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, @@ -581,6 +731,7 @@ done: return err; } +/* NOTE: Uses comp->lock and comp->messages_lock */ OMX_STATETYPE gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout) { @@ -592,7 +743,10 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout) GST_DEBUG_OBJECT (comp->parent, "Getting state"); - gst_omx_rec_mutex_lock (&comp->state_lock); + g_mutex_lock (&comp->lock); + + gst_omx_component_handle_messages (comp); + ret = comp->state; if (comp->pending_state == OMX_StateInvalid) goto done; @@ -611,22 +765,32 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout) goto done; wait_until = g_get_monotonic_time () + add; - GST_DEBUG_OBJECT (comp->parent, "Waiting for %ld us", add); + GST_DEBUG_OBJECT (comp->parent, "Waiting for %" G_GINT64_FORMAT "us", add); } else { GST_DEBUG_OBJECT (comp->parent, "Waiting for signal"); } - do { + gst_omx_component_handle_messages (comp); + while (signalled && comp->last_error == OMX_ErrorNone + && comp->pending_state != OMX_StateInvalid) { + g_mutex_lock (&comp->messages_lock); + g_mutex_unlock (&comp->lock); + if (!g_queue_is_empty (&comp->messages)) { + signalled = TRUE; + } if (wait_until == -1) { - g_cond_wait (&comp->state_cond, &comp->state_lock.lock); + g_cond_wait (&comp->messages_cond, &comp->messages_lock); signalled = TRUE; } else { signalled = - g_cond_wait_until (&comp->state_cond, &comp->state_lock.lock, + g_cond_wait_until (&comp->messages_cond, &comp->messages_lock, wait_until); } - } while (signalled && comp->last_error == OMX_ErrorNone - && comp->pending_state != OMX_StateInvalid); + g_mutex_unlock (&comp->messages_lock); + g_mutex_lock (&comp->lock); + if (signalled) + gst_omx_component_handle_messages (comp); + }; if (signalled) { if (comp->last_error != OMX_ErrorNone) { @@ -647,7 +811,7 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout) } done: - gst_omx_rec_mutex_unlock (&comp->state_lock); + g_mutex_unlock (&comp->lock); /* If we waited and timed out this component is unusable now */ if (!signalled) @@ -694,9 +858,7 @@ gst_omx_component_add_port (GstOMXComponent * comp, guint32 index) port->port_def = port_def; - gst_omx_rec_mutex_init (&port->port_lock); - g_cond_init (&port->port_cond); - port->pending_buffers = g_queue_new (); + g_queue_init (&port->pending_buffers); port->flushing = TRUE; port->flushed = FALSE; port->settings_changed = FALSE; @@ -717,12 +879,6 @@ gst_omx_component_get_port (GstOMXComponent * comp, guint32 index) { gint i, n; - /* No need for locking here because the - * ports are all added directly after - * creating the component and are removed - * when the component is destroyed. - */ - n = comp->ports->len; for (i = 0; i < n; i++) { GstOMXPort *tmp = g_ptr_array_index (comp->ports, i); @@ -754,43 +910,31 @@ gst_omx_component_trigger_settings_changed (GstOMXComponent * comp, } } -/* NOTE: This takes comp->state_lock *and* *all* port->port_lock! */ +/* NOTE: Uses comp->lock and comp->messages_lock */ void gst_omx_component_set_last_error (GstOMXComponent * comp, OMX_ERRORTYPE err) { - gint i, n; - g_return_if_fail (comp != NULL); if (err == OMX_ErrorNone) return; + g_mutex_lock (&comp->lock); GST_ERROR_OBJECT (comp->parent, "Setting last error: %s (0x%08x)", gst_omx_error_to_string (err), err); - gst_omx_rec_mutex_recursive_lock (&comp->state_lock); /* We only set the first error ever from which * we can't recover anymore. */ if (comp->last_error == OMX_ErrorNone) comp->last_error = err; - g_cond_broadcast (&comp->state_cond); - gst_omx_rec_mutex_recursive_unlock (&comp->state_lock); + g_mutex_unlock (&comp->lock); - /* Now notify all ports, no locking needed - * here because the ports are allocated in the - * very beginning and never change again until - * component destruction. - */ - n = (comp->ports ? comp->ports->len : 0); - for (i = 0; i < n; i++) { - GstOMXPort *tmp = g_ptr_array_index (comp->ports, i); - - gst_omx_rec_mutex_recursive_lock (&tmp->port_lock); - g_cond_broadcast (&tmp->port_cond); - gst_omx_rec_mutex_recursive_unlock (&tmp->port_lock); - } + g_mutex_lock (&comp->messages_lock); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); } +/* NOTE: Uses comp->lock and comp->messages_lock */ OMX_ERRORTYPE gst_omx_component_get_last_error (GstOMXComponent * comp) { @@ -798,9 +942,10 @@ gst_omx_component_get_last_error (GstOMXComponent * comp) g_return_val_if_fail (comp != NULL, OMX_ErrorUndefined); - gst_omx_rec_mutex_lock (&comp->state_lock); + g_mutex_lock (&comp->lock); + gst_omx_component_handle_messages (comp); err = comp->last_error; - gst_omx_rec_mutex_unlock (&comp->state_lock); + g_mutex_unlock (&comp->lock); GST_DEBUG_OBJECT (comp->parent, "Returning last error: %s (0x%08x)", gst_omx_error_to_string (err), err); @@ -816,6 +961,7 @@ gst_omx_component_get_last_error_string (GstOMXComponent * comp) return gst_omx_error_to_string (gst_omx_component_get_last_error (comp)); } +/* comp->lock must be unlocked while calling this */ OMX_ERRORTYPE gst_omx_component_get_parameter (GstOMXComponent * comp, OMX_INDEXTYPE index, gpointer param) @@ -833,6 +979,7 @@ gst_omx_component_get_parameter (GstOMXComponent * comp, OMX_INDEXTYPE index, return err; } +/* comp->lock must be unlocked while calling this */ OMX_ERRORTYPE gst_omx_component_set_parameter (GstOMXComponent * comp, OMX_INDEXTYPE index, gpointer param) @@ -850,6 +997,7 @@ gst_omx_component_set_parameter (GstOMXComponent * comp, OMX_INDEXTYPE index, return err; } +/* comp->lock must be unlocked while calling this */ OMX_ERRORTYPE gst_omx_component_get_config (GstOMXComponent * comp, OMX_INDEXTYPE index, gpointer config) @@ -868,6 +1016,7 @@ gst_omx_component_get_config (GstOMXComponent * comp, OMX_INDEXTYPE index, return err; } +/* comp->lock must be unlocked while calling this */ OMX_ERRORTYPE gst_omx_component_set_config (GstOMXComponent * comp, OMX_INDEXTYPE index, gpointer config) @@ -914,7 +1063,6 @@ gst_omx_port_update_port_definition (GstOMXPort * port, comp = port->comp; - gst_omx_rec_mutex_lock (&port->port_lock); if (port_def) err = gst_omx_component_set_parameter (comp, OMX_IndexParamPortDefinition, @@ -925,11 +1073,10 @@ gst_omx_port_update_port_definition (GstOMXPort * port, GST_DEBUG_OBJECT (comp->parent, "Updated port %u definition: %s (0x%08x)", port->index, gst_omx_error_to_string (err), err); - gst_omx_rec_mutex_unlock (&port->port_lock); - return (err == OMX_ErrorNone); } +/* NOTE: Uses comp->lock and comp->messages_lock */ GstOMXAcquireBufferReturn gst_omx_port_acquire_buffer (GstOMXPort * port, GstOMXBuffer ** buf) { @@ -945,14 +1092,14 @@ gst_omx_port_acquire_buffer (GstOMXPort * port, GstOMXBuffer ** buf) comp = port->comp; + g_mutex_lock (&comp->lock); GST_DEBUG_OBJECT (comp->parent, "Acquiring buffer from port %u", port->index); - gst_omx_rec_mutex_lock (&port->port_lock); - retry: + gst_omx_component_handle_messages (comp); /* Check if the component is in an error state */ - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { + if ((err = comp->last_error) != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Component is in error state: %s", gst_omx_error_to_string (err)); ret = GST_OMX_ACQUIRE_BUFFER_ERROR; @@ -972,16 +1119,19 @@ retry: */ if (port->port_def.eDir == OMX_DirInput) { if (g_atomic_int_get (&comp->have_pending_reconfigure_outports)) { - gst_omx_rec_mutex_unlock (&port->port_lock); - gst_omx_rec_mutex_lock (&comp->state_lock); + gst_omx_component_handle_messages (comp); while (g_atomic_int_get (&comp->have_pending_reconfigure_outports) && (err = comp->last_error) == OMX_ErrorNone && !port->flushing) { GST_DEBUG_OBJECT (comp->parent, "Waiting for output ports to reconfigure"); - g_cond_wait (&comp->state_cond, &comp->state_lock.lock); + g_mutex_lock (&comp->messages_lock); + g_mutex_unlock (&comp->lock); + if (g_queue_is_empty (&comp->messages)) + g_cond_wait (&comp->messages_cond, &comp->messages_lock); + g_mutex_unlock (&comp->messages_lock); + g_mutex_lock (&comp->lock); + gst_omx_component_handle_messages (comp); } - gst_omx_rec_mutex_unlock (&comp->state_lock); - gst_omx_rec_mutex_lock (&port->port_lock); goto retry; } @@ -1001,11 +1151,11 @@ retry: * we have to drop them... */ if (port->port_def.eDir == OMX_DirOutput && port->settings_cookie != port->configured_settings_cookie) { - if (!g_queue_is_empty (port->pending_buffers)) { + if (!g_queue_is_empty (&port->pending_buffers)) { GST_DEBUG_OBJECT (comp->parent, "Output port %u needs reconfiguration but has buffers pending", port->index); - _buf = g_queue_pop_head (port->pending_buffers); + _buf = g_queue_pop_head (&port->pending_buffers); ret = GST_OMX_ACQUIRE_BUFFER_OK; goto done; @@ -1034,21 +1184,31 @@ retry: * arrives, an error happens, the port is flushing * or the port needs to be reconfigured. */ - if (g_queue_is_empty (port->pending_buffers)) { + gst_omx_component_handle_messages (comp); + if (g_queue_is_empty (&port->pending_buffers)) { GST_DEBUG_OBJECT (comp->parent, "Queue of port %u is empty", port->index); - g_cond_wait (&port->port_cond, &port->port_lock.lock); + g_mutex_lock (&comp->messages_lock); + g_mutex_unlock (&comp->lock); + if (g_queue_is_empty (&comp->messages)) + g_cond_wait (&comp->messages_cond, &comp->messages_lock); + g_mutex_unlock (&comp->messages_lock); + g_mutex_lock (&comp->lock); + gst_omx_component_handle_messages (comp); + + /* And now check everything again and maybe get a buffer */ + goto retry; } else { GST_DEBUG_OBJECT (comp->parent, "Port %u has pending buffers", port->index); - _buf = g_queue_pop_head (port->pending_buffers); + _buf = g_queue_pop_head (&port->pending_buffers); ret = GST_OMX_ACQUIRE_BUFFER_OK; goto done; } - /* And now check everything again and maybe get a buffer */ + g_assert_not_reached (); goto retry; done: - gst_omx_rec_mutex_unlock (&port->port_lock); + g_mutex_unlock (&comp->lock); if (_buf) { g_assert (_buf == _buf->omx_buf->pAppPrivate); @@ -1061,6 +1221,7 @@ done: return ret; } +/* NOTE: Uses comp->lock and comp->messages_lock */ OMX_ERRORTYPE gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf) { @@ -1073,10 +1234,12 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf) comp = port->comp; + g_mutex_lock (&comp->lock); + GST_DEBUG_OBJECT (comp->parent, "Releasing buffer %p (%p) to port %u", buf, buf->omx_buf->pBuffer, port->index); - gst_omx_rec_mutex_lock_for_recursion (&port->port_lock); + gst_omx_component_handle_messages (comp); if (port->port_def.eDir == OMX_DirInput) { /* Reset all flags, some implementations don't @@ -1086,19 +1249,23 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf) buf->omx_buf->nFlags = 0; } - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { + if ((err = comp->last_error) != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Component is in error state: %s (0x%08x)", gst_omx_error_to_string (err), err); - g_queue_push_tail (port->pending_buffers, buf); - g_cond_broadcast (&port->port_cond); + g_queue_push_tail (&port->pending_buffers, buf); + g_mutex_lock (&comp->messages_lock); + g_cond_broadcast (&comp->messages_cond); + g_mutex_lock (&comp->messages_lock); goto done; } if (port->flushing) { GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing, not releasing buffer", port->index); - g_queue_push_tail (port->pending_buffers, buf); - g_cond_broadcast (&port->port_cond); + g_queue_push_tail (&port->pending_buffers, buf); + g_mutex_lock (&comp->messages_lock); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); goto done; } @@ -1108,23 +1275,25 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf) buf->used = TRUE; - gst_omx_rec_mutex_begin_recursion (&port->port_lock); if (port->port_def.eDir == OMX_DirInput) { err = OMX_EmptyThisBuffer (comp->handle, buf->omx_buf); } else { err = OMX_FillThisBuffer (comp->handle, buf->omx_buf); } - gst_omx_rec_mutex_end_recursion (&port->port_lock); - GST_DEBUG_OBJECT (comp->parent, "Released buffer %p to port %u: %s (0x%08x)", buf, port->index, gst_omx_error_to_string (err), err); done: - gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock); + gst_omx_component_handle_messages (comp); + g_mutex_unlock (&comp->lock); + + if (err != OMX_ErrorNone) + gst_omx_component_set_last_error (comp, err); return err; } +/* NOTE: Uses comp->lock and comp->messages_lock */ OMX_ERRORTYPE gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) { @@ -1134,33 +1303,34 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) g_return_val_if_fail (port != NULL, OMX_ErrorUndefined); comp = port->comp; + + g_mutex_lock (&comp->lock); + GST_DEBUG_OBJECT (comp->parent, "Setting port %d to %sflushing", port->index, (flush ? "" : "not ")); - gst_omx_rec_mutex_lock_for_recursion (&port->port_lock); + gst_omx_component_handle_messages (comp); + if (! !flush == ! !port->flushing) { GST_DEBUG_OBJECT (comp->parent, "Port %u was %sflushing already", port->index, (flush ? "" : "not ")); goto done; } - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { + if ((err = comp->last_error) != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Component is in error state: %s (0x%08x)", gst_omx_error_to_string (err), err); goto done; } - gst_omx_rec_mutex_lock (&comp->state_lock); if (comp->state != OMX_StateIdle && comp->state != OMX_StateExecuting) { GST_DEBUG_OBJECT (comp->parent, "Component is in wrong state: %d", comp->state); err = OMX_ErrorUndefined; - gst_omx_rec_mutex_unlock (&comp->state_lock); goto done; } - gst_omx_rec_mutex_unlock (&comp->state_lock); port->flushing = flush; if (flush) { @@ -1168,25 +1338,14 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) gboolean signalled; OMX_ERRORTYPE last_error; - g_cond_broadcast (&port->port_cond); - - /* We also need to signal the state cond because - * an input port might wait on this for the output - * ports to reconfigure. This will not confuse - * other waiters on the state cond because they will - * additionally check if the condition they're waiting - * for is true after waking up. - */ - gst_omx_rec_mutex_lock (&comp->state_lock); - g_cond_broadcast (&comp->state_cond); - gst_omx_rec_mutex_unlock (&comp->state_lock); + g_mutex_lock (&comp->messages_lock); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); /* Now flush the port */ port->flushed = FALSE; - gst_omx_rec_mutex_begin_recursion (&port->port_lock); err = OMX_SendCommand (comp->handle, OMX_CommandFlush, port->index, NULL); - gst_omx_rec_mutex_end_recursion (&port->port_lock); if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, @@ -1195,7 +1354,7 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) goto done; } - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { + if ((err = comp->last_error) != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Component is in error state: %s (0x%08x)", gst_omx_error_to_string (err), err); @@ -1215,13 +1374,24 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) * the flush command completed */ signalled = TRUE; last_error = OMX_ErrorNone; + gst_omx_component_handle_messages (comp); while (signalled && last_error == OMX_ErrorNone && !port->flushed - && port->buffers->len > g_queue_get_length (port->pending_buffers)) { - signalled = - g_cond_wait_until (&comp->state_cond, &comp->state_lock.lock, - wait_until); + && port->buffers->len > g_queue_get_length (&port->pending_buffers)) { + g_mutex_lock (&comp->messages_lock); + g_mutex_unlock (&comp->lock); + if (!g_queue_is_empty (&comp->messages)) + signalled = TRUE; + else + signalled = + g_cond_wait_until (&comp->messages_cond, &comp->messages_lock, + wait_until); + g_mutex_unlock (&comp->messages_lock); + g_mutex_lock (&comp->lock); - last_error = gst_omx_component_get_last_error (comp); + if (signalled) + gst_omx_component_handle_messages (comp); + + last_error = comp->last_error; } port->flushed = FALSE; @@ -1243,7 +1413,7 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) GstOMXBuffer *buf; /* Enqueue all buffers for the component to fill */ - while ((buf = g_queue_pop_head (port->pending_buffers))) { + while ((buf = g_queue_pop_head (&port->pending_buffers))) { if (!buf) continue; @@ -1255,9 +1425,7 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) */ buf->omx_buf->nFlags = 0; - gst_omx_rec_mutex_begin_recursion (&port->port_lock); err = OMX_FillThisBuffer (comp->handle, buf->omx_buf); - gst_omx_rec_mutex_end_recursion (&port->port_lock); if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, @@ -1275,23 +1443,21 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) done: GST_DEBUG_OBJECT (comp->parent, "Set port %u to %sflushing: %s (0x%08x)", port->index, (flush ? "" : "not "), gst_omx_error_to_string (err), err); - gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock); + gst_omx_component_handle_messages (comp); + g_mutex_unlock (&comp->lock); return err; error: { - /* Need to unlock the port lock here because - * set_last_error() needs all port locks. - * This is safe here because we're just going - * to error out anyway */ - gst_omx_rec_mutex_unlock (&port->port_lock); + g_mutex_unlock (&comp->lock); gst_omx_component_set_last_error (comp, err); - gst_omx_rec_mutex_lock (&port->port_lock); + g_mutex_lock (&comp->lock); goto done; } } +/* NOTE: Uses comp->lock and comp->messages_lock */ gboolean gst_omx_port_is_flushing (GstOMXPort * port) { @@ -1302,9 +1468,10 @@ gst_omx_port_is_flushing (GstOMXPort * port) comp = port->comp; - gst_omx_rec_mutex_lock (&port->port_lock); + g_mutex_lock (&comp->lock); + gst_omx_component_handle_messages (port->comp); flushing = port->flushing; - gst_omx_rec_mutex_unlock (&port->port_lock); + g_mutex_unlock (&comp->lock); GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing: %d", port->index, flushing); @@ -1312,7 +1479,7 @@ gst_omx_port_is_flushing (GstOMXPort * port) return flushing; } -/* Must be called while holding port->lock */ +/* NOTE: Must be called while holding comp->lock, uses comp->messages_lock */ static OMX_ERRORTYPE gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port) { @@ -1324,7 +1491,8 @@ gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port) comp = port->comp; - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { + gst_omx_component_handle_messages (port->comp); + if ((err = comp->last_error) != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)", gst_omx_error_to_string (err), err); goto done; @@ -1373,11 +1541,9 @@ gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port) buf->settings_cookie = port->settings_cookie; g_ptr_array_add (port->buffers, buf); - gst_omx_rec_mutex_begin_recursion (&port->port_lock); err = OMX_AllocateBuffer (comp->handle, &buf->omx_buf, port->index, buf, port->port_def.nBufferSize); - gst_omx_rec_mutex_end_recursion (&port->port_lock); if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, @@ -1392,9 +1558,11 @@ gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port) g_assert (buf->omx_buf->pAppPrivate == buf); /* In the beginning all buffers are not owned by the component */ - g_queue_push_tail (port->pending_buffers, buf); + g_queue_push_tail (&port->pending_buffers, buf); } + gst_omx_component_handle_messages (comp); + done: GST_DEBUG_OBJECT (comp->parent, "Allocated buffers for port %u: %s (0x%08x)", port->index, gst_omx_error_to_string (err), err); @@ -1403,17 +1571,14 @@ done: error: { - /* Need to unlock the port lock here because - * set_last_error() needs all port locks. - * This is safe here because we're just going - * to error out anyway */ - gst_omx_rec_mutex_unlock (&port->port_lock); + g_mutex_unlock (&comp->lock); gst_omx_component_set_last_error (comp, err); - gst_omx_rec_mutex_lock (&port->port_lock); + g_mutex_lock (&comp->lock); goto done; } } +/* NOTE: Uses comp->lock and comp->messages_lock */ OMX_ERRORTYPE gst_omx_port_allocate_buffers (GstOMXPort * port) { @@ -1421,14 +1586,14 @@ gst_omx_port_allocate_buffers (GstOMXPort * port) g_return_val_if_fail (port != NULL, OMX_ErrorUndefined); - gst_omx_rec_mutex_lock_for_recursion (&port->port_lock); + g_mutex_lock (&port->comp->lock); err = gst_omx_port_allocate_buffers_unlocked (port); - gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock); + g_mutex_unlock (&port->comp->lock); return err; } -/* Must be called while holding port->lock */ +/* NOTE: Must be called while holding comp->lock, uses comp->messages_lock */ static OMX_ERRORTYPE gst_omx_port_deallocate_buffers_unlocked (GstOMXPort * port) { @@ -1441,13 +1606,15 @@ gst_omx_port_deallocate_buffers_unlocked (GstOMXPort * port) GST_DEBUG_OBJECT (comp->parent, "Deallocating buffers of port %u", port->index); + gst_omx_component_handle_messages (port->comp); + if (!port->buffers) { GST_DEBUG_OBJECT (comp->parent, "No buffers allocated for port %u", port->index); goto done; } - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { + if ((err = comp->last_error) != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)", gst_omx_error_to_string (err), err); /* We still try to deallocate all buffers */ @@ -1478,9 +1645,7 @@ gst_omx_port_deallocate_buffers_unlocked (GstOMXPort * port) GST_DEBUG_OBJECT (comp->parent, "Deallocating buffer %p (%p)", buf, buf->omx_buf->pBuffer); - gst_omx_rec_mutex_begin_recursion (&port->port_lock); tmp = OMX_FreeBuffer (comp->handle, port->index, buf->omx_buf); - gst_omx_rec_mutex_end_recursion (&port->port_lock); if (tmp != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, @@ -1492,15 +1657,12 @@ gst_omx_port_deallocate_buffers_unlocked (GstOMXPort * port) } g_slice_free (GstOMXBuffer, buf); } - - g_queue_clear (port->pending_buffers); -#if GLIB_CHECK_VERSION(2,22,0) + g_queue_clear (&port->pending_buffers); g_ptr_array_unref (port->buffers); -#else - g_ptr_array_free (port->buffers, TRUE); -#endif port->buffers = NULL; + gst_omx_component_handle_messages (comp); + done: GST_DEBUG_OBJECT (comp->parent, "Deallocated buffers of port %u: %s (0x%08x)", port->index, gst_omx_error_to_string (err), err); @@ -1508,6 +1670,7 @@ done: return err; } +/* NOTE: Uses comp->lock and comp->messages_lock */ OMX_ERRORTYPE gst_omx_port_deallocate_buffers (GstOMXPort * port) { @@ -1515,14 +1678,14 @@ gst_omx_port_deallocate_buffers (GstOMXPort * port) g_return_val_if_fail (port != NULL, OMX_ErrorUndefined); - gst_omx_rec_mutex_lock_for_recursion (&port->port_lock); + g_mutex_lock (&port->comp->lock); err = gst_omx_port_deallocate_buffers_unlocked (port); - gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock); + g_mutex_unlock (&port->comp->lock); return err; } -/* Must be called while holding port->lock */ +/* NOTE: Must be called while holding comp->lock, uses comp->messages_lock */ static OMX_ERRORTYPE gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) { @@ -1534,7 +1697,9 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) comp = port->comp; - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { + gst_omx_component_handle_messages (comp); + + if ((err = comp->last_error) != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)", gst_omx_error_to_string (err), err); goto done; @@ -1558,7 +1723,6 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) port->flushing = TRUE; } - gst_omx_rec_mutex_begin_recursion (&port->port_lock); if (enabled) err = OMX_SendCommand (comp->handle, OMX_CommandPortEnable, port->index, @@ -1567,7 +1731,6 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) err = OMX_SendCommand (comp->handle, OMX_CommandPortDisable, port->index, NULL); - gst_omx_rec_mutex_end_recursion (&port->port_lock); if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, @@ -1576,7 +1739,7 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) goto error; } - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { + if ((err = comp->last_error) != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)", gst_omx_error_to_string (err), err); goto done; @@ -1593,11 +1756,23 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) /* First wait until all buffers are released by the port */ signalled = TRUE; last_error = OMX_ErrorNone; + gst_omx_component_handle_messages (comp); while (signalled && last_error == OMX_ErrorNone && (port->buffers - && port->buffers->len > g_queue_get_length (port->pending_buffers))) { - signalled = - g_cond_wait_until (&port->port_cond, &port->port_lock.lock, wait_until); - last_error = gst_omx_component_get_last_error (comp); + && port->buffers->len > + g_queue_get_length (&port->pending_buffers))) { + g_mutex_lock (&comp->messages_lock); + g_mutex_unlock (&comp->lock); + if (!g_queue_is_empty (&comp->messages)) + signalled = TRUE; + else + signalled = + g_cond_wait_until (&comp->messages_cond, &comp->messages_lock, + wait_until); + g_mutex_unlock (&comp->messages_lock); + g_mutex_lock (&comp->lock); + if (signalled) + gst_omx_component_handle_messages (comp); + last_error = comp->last_error; } if (last_error != OMX_ErrorNone) { @@ -1633,14 +1808,26 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) last_error = OMX_ErrorNone; gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition, &port->port_def); + gst_omx_component_handle_messages (comp); while (signalled && last_error == OMX_ErrorNone && (! !port->port_def.bEnabled != ! !enabled || !port->enabled_changed)) { - signalled = - g_cond_wait_until (&port->port_cond, &port->port_lock.lock, wait_until); - last_error = gst_omx_component_get_last_error (comp); + g_mutex_lock (&comp->messages_lock); + g_mutex_unlock (&comp->lock); + if (!g_queue_is_empty (&comp->messages)) + signalled = TRUE; + else + signalled = + g_cond_wait_until (&comp->messages_cond, &comp->messages_lock, + wait_until); + g_mutex_unlock (&comp->messages_lock); + g_mutex_lock (&comp->lock); + if (signalled) + gst_omx_component_handle_messages (comp); + last_error = comp->last_error; gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition, &port->port_def); } + port->enabled_changed = FALSE; if (!signalled) { GST_ERROR_OBJECT (comp->parent, @@ -1665,7 +1852,7 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) GstOMXBuffer *buf; /* Enqueue all buffers for the component to fill */ - while ((buf = g_queue_pop_head (port->pending_buffers))) { + while ((buf = g_queue_pop_head (&port->pending_buffers))) { if (!buf) continue; @@ -1677,9 +1864,7 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) */ buf->omx_buf->nFlags = 0; - gst_omx_rec_mutex_begin_recursion (&port->port_lock); err = OMX_FillThisBuffer (comp->handle, buf->omx_buf); - gst_omx_rec_mutex_end_recursion (&port->port_lock); if (err != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, @@ -1694,6 +1879,8 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled) } } + gst_omx_component_handle_messages (comp); + done: GST_DEBUG_OBJECT (comp->parent, "Port %u is %s%s: %s (0x%08x)", port->index, (err == OMX_ErrorNone ? "" : "not "), @@ -1703,17 +1890,14 @@ done: error: { - /* Need to unlock the port lock here because - * set_last_error() needs all port locks. - * This is safe here because we're just going - * to error out anyway */ - gst_omx_rec_mutex_unlock (&port->port_lock); + g_mutex_unlock (&comp->lock); gst_omx_component_set_last_error (comp, err); - gst_omx_rec_mutex_lock (&port->port_lock); + g_mutex_lock (&comp->lock); goto done; } } +/* NOTE: Uses comp->lock and comp->messages_lock */ OMX_ERRORTYPE gst_omx_port_set_enabled (GstOMXPort * port, gboolean enabled) { @@ -1721,9 +1905,9 @@ gst_omx_port_set_enabled (GstOMXPort * port, gboolean enabled) g_return_val_if_fail (port != NULL, OMX_ErrorUndefined); - gst_omx_rec_mutex_lock_for_recursion (&port->port_lock); + g_mutex_lock (&port->comp->lock); err = gst_omx_port_set_enabled_unlocked (port, enabled); - gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock); + g_mutex_unlock (&port->comp->lock); return err; } @@ -1738,11 +1922,9 @@ gst_omx_port_is_enabled (GstOMXPort * port) comp = port->comp; - gst_omx_rec_mutex_lock (&port->port_lock); gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition, &port->port_def); - enabled = port->port_def.bEnabled; - gst_omx_rec_mutex_unlock (&port->port_lock); + enabled = ! !port->port_def.bEnabled; GST_DEBUG_OBJECT (comp->parent, "Port %u is enabled: %d", port->index, enabled); @@ -1750,6 +1932,7 @@ gst_omx_port_is_enabled (GstOMXPort * port) return enabled; } +/* NOTE: Uses comp->lock and comp->messages_lock */ OMX_ERRORTYPE gst_omx_port_reconfigure (GstOMXPort * port) { @@ -1760,14 +1943,15 @@ gst_omx_port_reconfigure (GstOMXPort * port) comp = port->comp; + g_mutex_lock (&comp->lock); GST_DEBUG_OBJECT (comp->parent, "Reconfiguring port %u", port->index); - gst_omx_rec_mutex_lock_for_recursion (&port->port_lock); + gst_omx_component_handle_messages (comp); if (!port->settings_changed) goto done; - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) + if ((err = comp->last_error) != OMX_ErrorNone) goto done; /* Disable and enable the port. This already takes @@ -1790,7 +1974,6 @@ gst_omx_port_reconfigure (GstOMXPort * port) if (port->port_def.eDir == OMX_DirOutput) { GList *l; - gst_omx_rec_mutex_lock (&comp->state_lock); for (l = comp->pending_reconfigure_outports; l; l = l->next) { if (l->data == (gpointer) port) { comp->pending_reconfigure_outports = @@ -1800,20 +1983,22 @@ gst_omx_port_reconfigure (GstOMXPort * port) } if (!comp->pending_reconfigure_outports) { g_atomic_int_set (&comp->have_pending_reconfigure_outports, 0); - g_cond_broadcast (&comp->state_cond); + g_mutex_lock (&comp->messages_lock); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); } - gst_omx_rec_mutex_unlock (&comp->state_lock); } done: GST_DEBUG_OBJECT (comp->parent, "Reconfigured port %u: %s (0x%08x)", port->index, gst_omx_error_to_string (err), err); - gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock); + g_mutex_unlock (&comp->lock); return err; } +/* NOTE: Uses comp->lock and comp->messages_lock */ OMX_ERRORTYPE gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start) { @@ -1824,12 +2009,14 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start) comp = port->comp; + g_mutex_lock (&comp->lock); + GST_DEBUG_OBJECT (comp->parent, "Manual reconfigure of port %u %s", port->index, (start ? "start" : "stsop")); - gst_omx_rec_mutex_lock (&port->port_lock); + gst_omx_component_handle_messages (comp); - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) + if ((err = comp->last_error) != OMX_ErrorNone) goto done; if (start) @@ -1841,7 +2028,6 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start) GList *l; if (start) { - gst_omx_rec_mutex_lock (&comp->state_lock); for (l = comp->pending_reconfigure_outports; l; l = l->next) { if (l->data == (gpointer) port) break; @@ -1852,9 +2038,7 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start) g_list_prepend (comp->pending_reconfigure_outports, port); g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1); } - gst_omx_rec_mutex_unlock (&comp->state_lock); } else { - gst_omx_rec_mutex_lock (&comp->state_lock); for (l = comp->pending_reconfigure_outports; l; l = l->next) { if (l->data == (gpointer) port) { comp->pending_reconfigure_outports = @@ -1864,19 +2048,20 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start) } if (!comp->pending_reconfigure_outports) { g_atomic_int_set (&comp->have_pending_reconfigure_outports, 0); - g_cond_broadcast (&comp->state_cond); + g_mutex_lock (&comp->messages_lock); + g_cond_broadcast (&comp->messages_cond); + g_mutex_unlock (&comp->messages_lock); } - gst_omx_rec_mutex_unlock (&comp->state_lock); } } done: - gst_omx_rec_mutex_unlock (&port->port_lock); - GST_DEBUG_OBJECT (comp->parent, "Manual reconfigure of port %u: %s (0x%08x)", port->index, gst_omx_error_to_string (err), err); + g_mutex_unlock (&comp->lock); + return err; } diff --git a/omx/gstomx.h b/omx/gstomx.h index 3beb9e4f2b..2f8347be73 100644 --- a/omx/gstomx.h +++ b/omx/gstomx.h @@ -1,6 +1,8 @@ /* * Copyright (C) 2011, Hewlett-Packard Development Company, L.P. * Author: Sebastian Dröge , Collabora Ltd. + * Copyright (C) 2013, Collabora Ltd. + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -52,8 +54,6 @@ #pragma pack() #endif -#include "gstomxrecmutex.h" - G_BEGIN_DECLS #define GST_OMX_INIT_STRUCT(st) G_STMT_START { \ @@ -112,6 +112,7 @@ typedef enum _GstOMXPortDirection GstOMXPortDirection; typedef struct _GstOMXComponent GstOMXComponent; typedef struct _GstOMXBuffer GstOMXBuffer; typedef struct _GstOMXClassData GstOMXClassData; +typedef struct _GstOMXMessage GstOMXMessage; typedef enum { /* Everything good and the buffer is valid */ @@ -137,8 +138,6 @@ struct _GstOMXCore { gint user_count; /* LOCK */ /* OpenMAX core library functions, protected with LOCK */ - /* FIXME: OpenMAX spec does not specify that this is required - * but gst-openmax does it */ OMX_ERRORTYPE (*init) (void); OMX_ERRORTYPE (*deinit) (void); OMX_ERRORTYPE (*get_handle) (OMX_HANDLETYPE * handle, @@ -146,32 +145,51 @@ struct _GstOMXCore { OMX_ERRORTYPE (*free_handle) (OMX_HANDLETYPE handle); }; +typedef enum { + GST_OMX_MESSAGE_STATE_SET, + GST_OMX_MESSAGE_FLUSH, + GST_OMX_MESSAGE_ERROR, + GST_OMX_MESSAGE_PORT_ENABLE, + GST_OMX_MESSAGE_PORT_SETTINGS_CHANGED, + GST_OMX_MESSAGE_BUFFER_DONE, +} GstOMXMessageType; + +struct _GstOMXMessage { + GstOMXMessageType type; + + union { + struct { + OMX_STATETYPE state; + } state_set; + struct { + OMX_U32 port; + } flush; + struct { + OMX_ERRORTYPE error; + } error; + struct { + OMX_U32 port; + OMX_BOOL enable; + } port_enable; + struct { + OMX_U32 port; + } port_settings_changed; + struct { + OMX_HANDLETYPE component; + OMX_PTR app_data; + OMX_BUFFERHEADERTYPE *buffer; + OMX_BOOL empty; + } buffer_done; + } content; +}; + struct _GstOMXPort { GstOMXComponent *comp; guint32 index; - /* Protects port_def, buffers, pending_buffers, - * settings_changed, flushing, flushed, enabled_changed - * and settings_cookie. - * - * Signalled if pending_buffers gets a - * new buffer or flushing/flushed is set - * to TRUE or the port is enabled/disabled - * or the settings change or an error happens. - * - * Note: Always check comp->last_error before - * waiting and after being signalled! - * - * Note: flushed==TRUE implies flushing==TRUE! - * - * Note: This lock must always be taken before - * the component's state lock if both are needed! - */ - GstOMXRecMutex port_lock; - GCond port_cond; OMX_PARAM_PORTDEFINITIONTYPE port_def; GPtrArray *buffers; /* Contains GstOMXBuffer* */ - GQueue *pending_buffers; /* Contains GstOMXBuffer* */ + GQueue pending_buffers; /* Contains GstOMXBuffer* */ /* If TRUE we need to get the new caps of this port */ gboolean settings_changed; gboolean flushing; @@ -193,15 +211,20 @@ struct _GstOMXComponent { guint64 hacks; /* Flags, GST_OMX_HACK_* */ + /* Added once, never changed. No locks necessary */ GPtrArray *ports; /* Contains GstOMXPort* */ gint n_in_ports, n_out_ports; - /* Protecting state, pending_state, last_error, - * pending_reconfigure_outports. - * Signalled if one of them changes - */ - GstOMXRecMutex state_lock; - GCond state_cond; + /* Locking order: lock -> messages_lock + * + * Never hold lock while waiting for messages_cond + * Always check that messages is empty before waiting */ + GMutex lock; + + GQueue messages; /* Queue of GstOMXMessages */ + GMutex messages_lock; + GCond messages_cond; + OMX_STATETYPE state; /* OMX_StateInvalid if no pending state */ OMX_STATETYPE pending_state; diff --git a/omx/gstomxaudioenc.c b/omx/gstomxaudioenc.c index 0590a97e9c..6b4a3d532f 100644 --- a/omx/gstomxaudioenc.c +++ b/omx/gstomxaudioenc.c @@ -901,10 +901,12 @@ gst_omx_audio_enc_sink_event (GstAudioEncoder * encoder, GstEvent * event) GST_WARNING_OBJECT (self, "Component does not support empty EOS buffers"); /* Insert a NULL into the queue to signal EOS */ - gst_omx_rec_mutex_lock (&self->out_port->port_lock); - g_queue_push_tail (self->out_port->pending_buffers, NULL); - g_cond_broadcast (&self->out_port->port_cond); - gst_omx_rec_mutex_unlock (&self->out_port->port_lock); + g_mutex_lock (&self->component->lock); + g_queue_push_tail (&self->out_port->pending_buffers, NULL); + g_mutex_unlock (&self->component->lock); + g_mutex_lock (&self->component->messages_lock); + g_cond_broadcast (&self->component->messages_cond); + g_mutex_unlock (&self->component->messages_lock); return TRUE; } diff --git a/omx/gstomxrecmutex.c b/omx/gstomxrecmutex.c deleted file mode 100644 index c12b3aa42b..0000000000 --- a/omx/gstomxrecmutex.c +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright (C) 2012, Collabora Ltd. - * Author: George Kiagiadakis - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation - * version 2.1 of the License. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - * - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include "gstomxrecmutex.h" - -void -gst_omx_rec_mutex_init (GstOMXRecMutex * mutex) -{ - g_mutex_init (&mutex->lock); - g_mutex_init (&mutex->recursion_lock); - g_cond_init (&mutex->recursion_wait_cond); - g_atomic_int_set (&mutex->recursion_allowed, FALSE); - g_atomic_int_set (&mutex->recursion_pending, FALSE); -} - -void -gst_omx_rec_mutex_clear (GstOMXRecMutex * mutex) -{ - g_mutex_clear (&mutex->lock); - g_mutex_clear (&mutex->recursion_lock); -} - -void -gst_omx_rec_mutex_lock (GstOMXRecMutex * mutex) -{ - g_mutex_lock (&mutex->lock); -} - -void -gst_omx_rec_mutex_unlock (GstOMXRecMutex * mutex) -{ - g_mutex_unlock (&mutex->lock); -} - -void -gst_omx_rec_mutex_lock_for_recursion (GstOMXRecMutex * mutex) -{ - gboolean exchanged; - - g_mutex_lock (&mutex->recursion_lock); - - while (g_atomic_int_get (&mutex->recursion_allowed)) - g_cond_wait (&mutex->recursion_wait_cond, &mutex->recursion_lock); - - g_mutex_lock (&mutex->lock); - exchanged = - g_atomic_int_compare_and_exchange (&mutex->recursion_pending, FALSE, - TRUE); - g_assert (exchanged); -} - -void -gst_omx_rec_mutex_unlock_for_recursion (GstOMXRecMutex * mutex) -{ - gboolean exchanged; - - exchanged = - g_atomic_int_compare_and_exchange (&mutex->recursion_pending, TRUE, - FALSE); - g_assert (exchanged); - g_mutex_unlock (&mutex->lock); - g_cond_broadcast (&mutex->recursion_wait_cond); - g_mutex_unlock (&mutex->recursion_lock); -} - -/* must be called with mutex->lock taken */ -void -gst_omx_rec_mutex_begin_recursion (GstOMXRecMutex * mutex) -{ - gboolean exchanged; - - exchanged = - g_atomic_int_compare_and_exchange (&mutex->recursion_allowed, FALSE, - TRUE); - g_assert (exchanged); - exchanged = - g_atomic_int_compare_and_exchange (&mutex->recursion_pending, TRUE, - FALSE); - g_assert (exchanged); - g_mutex_unlock (&mutex->recursion_lock); -} - -/* must be called with mutex->lock taken */ -void -gst_omx_rec_mutex_end_recursion (GstOMXRecMutex * mutex) -{ - gboolean exchanged; - - g_mutex_lock (&mutex->recursion_lock); - exchanged = - g_atomic_int_compare_and_exchange (&mutex->recursion_allowed, TRUE, - FALSE); - g_assert (exchanged); - exchanged = - g_atomic_int_compare_and_exchange (&mutex->recursion_pending, FALSE, - TRUE); - g_assert (exchanged); -} - -void -gst_omx_rec_mutex_recursive_lock (GstOMXRecMutex * mutex) -{ - g_mutex_lock (&mutex->recursion_lock); - if (!g_atomic_int_get (&mutex->recursion_allowed)) { - /* no recursion allowed, lock the proper mutex */ - g_mutex_lock (&mutex->lock); - g_mutex_unlock (&mutex->recursion_lock); - } -} - -void -gst_omx_rec_mutex_recursive_unlock (GstOMXRecMutex * mutex) -{ - /* It is safe to check recursion_allowed here because - * we hold at least one of the two locks and - * either lock protects it from being changed. - */ - if (g_atomic_int_get (&mutex->recursion_allowed)) { - g_mutex_unlock (&mutex->recursion_lock); - } else { - g_mutex_unlock (&mutex->lock); - } -} diff --git a/omx/gstomxrecmutex.h b/omx/gstomxrecmutex.h deleted file mode 100644 index 4645302a6d..0000000000 --- a/omx/gstomxrecmutex.h +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright (C) 2012, Collabora Ltd. - * Author: George Kiagiadakis - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation - * version 2.1 of the License. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - * - */ - -#ifndef __GST_OMX_REC_MUTEX_H__ -#define __GST_OMX_REC_MUTEX_H__ - -#include - -G_BEGIN_DECLS - -/* - * This is a recursive mutex implementation that serves a very specific - * purpose; it is used to allow OpenMAX callbacks to be run in the context - * of some OpenMAX function call while the calling function is holding the - * master lock. - * - * According to the OpenMAX specification, we have 2 possible ways that - * callbacks may be called. First, we have out-of-context calls, which means - * that callbacks are called from a different thread at any point in time. - * In this case, callbacks must take the appropriate lock to protect the data - * that they are changing. Second, we have in-context calls, which means - * that callbacks are called when we call some OpenMAX function, before this - * function returns. In this case, we need to allow the callback to run - * without taking any locks that the caller of the OpenMAX function is holding. - * - * This can be solved by a recusrive mutex. However, a normal GRecMutex is - * not enough because it allows being locked multiple times only from - * the same thread. Unfortunatly, what we see in Broadcom's implementation, - * for instance, OpenMAX callbacks may be in-context, but from a different - * thread. This is achieved like this: - * - * - OMX_Foo is called - * - OMX_Foo waits on a condition - * - The callback is executed in a different thread - * - When the callback returns, its calling function - * signals the condition that OMX_Foo waits on - * - OMX_Foo wakes up and returns - * - * This recursive mutex implementation attempts to fix this problem. - * This is achieved like this: All functions lock this mutex normally - * using gst_omx_rec_mutex_lock() / _unlock(). These functions - * effectively lock the master mutex and they are identical in behavior - * with g_mutex_lock() / _unlock(). When a function that has already - * locked this mutex is about to call some OpenMAX function, it must - * call gst_omx_rec_mutex_begin_recursion() to indicate that recursive - * locking is now allowed, and similarly, call gst_omx_rec_mutex_end_recursion() - * after the OpenMAX function has returned to indicate that no recursive - * locking is allowed from this point on. Callbacks should lock the - * mutex using gst_omx_rec_mutex_recursive_lock() / _recursive_unlock(). - * These two functions, depending on whether recursion is allowed - * will take/release either the master lock or the recursion_lock. - * Effectively, this allows callbacks to run in the context any code between - * calls to gst_omx_rec_mutex_begin_recursion() / _end_recursion(). - * - * Note that this doesn't prevent out-of-context callback executions - * to be run at that point, but due to the fact that _end_recursion() - * also locks the recursion_lock, it is at least guaranteed that they - * will have finished their execution before _end_recursion() returns. - */ -typedef struct _GstOMXRecMutex GstOMXRecMutex; - -struct _GstOMXRecMutex { - /* The master lock */ - GMutex lock; - - /* This lock is taken when recursing. - * The master lock must always be taken before this one, - * by the caller of _begin_recursion(). - */ - GMutex recursion_lock; - - /* Indicates whether recursion is allowed. - * When it is allowed, _recursive_lock() takes - * the recursion_lock instead of the master lock. - * This variable is protected by both locks. - */ - volatile gint recursion_allowed; - - /* Indicates whether lock is locked and recursion - * will be allowed soon - */ - volatile gint recursion_pending; - - GCond recursion_wait_cond; -}; - -void gst_omx_rec_mutex_init (GstOMXRecMutex * mutex); -void gst_omx_rec_mutex_clear (GstOMXRecMutex * mutex); - -void gst_omx_rec_mutex_lock (GstOMXRecMutex * mutex); -void gst_omx_rec_mutex_unlock (GstOMXRecMutex * mutex); - -void gst_omx_rec_mutex_lock_for_recursion (GstOMXRecMutex * mutex); -void gst_omx_rec_mutex_unlock_for_recursion (GstOMXRecMutex * mutex); - -void gst_omx_rec_mutex_begin_recursion (GstOMXRecMutex * mutex); -void gst_omx_rec_mutex_end_recursion (GstOMXRecMutex * mutex); - -void gst_omx_rec_mutex_recursive_lock (GstOMXRecMutex * mutex); -void gst_omx_rec_mutex_recursive_unlock (GstOMXRecMutex * mutex); - -G_END_DECLS - -#endif /* __GST_OMX_REC_MUTEX_H__ */