omx: Implement new approach for locking that should solve all deadlocks on RPi

No mutex is locked while calling any OpenMAX functions anymore
and everything from the OpenMAX callbacks is inserted into a message
queue and handled from outside the callbacks.

Also there's only a single mutex and condition variable per component
now for handling anything from OpenMAX callbacks and a single mutex
for keeping our component/port state sane.
This commit is contained in:
Sebastian Dröge 2013-01-10 14:44:33 +01:00
parent 1bf69c8f69
commit a42f27547d
6 changed files with 575 additions and 632 deletions

View file

@ -12,8 +12,7 @@ libgstopenmax_la_SOURCES = \
gstomxmpeg4videoenc.c \
gstomxh264enc.c \
gstomxh263enc.c \
gstomxaacenc.c \
gstomxrecmutex.c
gstomxaacenc.c
noinst_HEADERS = \
gstomx.h \
@ -27,8 +26,7 @@ noinst_HEADERS = \
gstomxmpeg4videoenc.h \
gstomxh264enc.h \
gstomxh263enc.h \
gstomxaacenc.h \
gstomxrecmutex.h
gstomxaacenc.h
libgstopenmax_la_CFLAGS = \
-DGST_USE_UNSTABLE_API=1 \

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,8 @@
/*
* Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
* Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
* Copyright (C) 2013, Collabora Ltd.
* Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@ -52,8 +54,6 @@
#pragma pack()
#endif
#include "gstomxrecmutex.h"
G_BEGIN_DECLS
#define GST_OMX_INIT_STRUCT(st) G_STMT_START { \
@ -112,6 +112,7 @@ typedef enum _GstOMXPortDirection GstOMXPortDirection;
typedef struct _GstOMXComponent GstOMXComponent;
typedef struct _GstOMXBuffer GstOMXBuffer;
typedef struct _GstOMXClassData GstOMXClassData;
typedef struct _GstOMXMessage GstOMXMessage;
typedef enum {
/* Everything good and the buffer is valid */
@ -137,8 +138,6 @@ struct _GstOMXCore {
gint user_count; /* LOCK */
/* OpenMAX core library functions, protected with LOCK */
/* FIXME: OpenMAX spec does not specify that this is required
* but gst-openmax does it */
OMX_ERRORTYPE (*init) (void);
OMX_ERRORTYPE (*deinit) (void);
OMX_ERRORTYPE (*get_handle) (OMX_HANDLETYPE * handle,
@ -146,32 +145,51 @@ struct _GstOMXCore {
OMX_ERRORTYPE (*free_handle) (OMX_HANDLETYPE handle);
};
typedef enum {
GST_OMX_MESSAGE_STATE_SET,
GST_OMX_MESSAGE_FLUSH,
GST_OMX_MESSAGE_ERROR,
GST_OMX_MESSAGE_PORT_ENABLE,
GST_OMX_MESSAGE_PORT_SETTINGS_CHANGED,
GST_OMX_MESSAGE_BUFFER_DONE,
} GstOMXMessageType;
struct _GstOMXMessage {
GstOMXMessageType type;
union {
struct {
OMX_STATETYPE state;
} state_set;
struct {
OMX_U32 port;
} flush;
struct {
OMX_ERRORTYPE error;
} error;
struct {
OMX_U32 port;
OMX_BOOL enable;
} port_enable;
struct {
OMX_U32 port;
} port_settings_changed;
struct {
OMX_HANDLETYPE component;
OMX_PTR app_data;
OMX_BUFFERHEADERTYPE *buffer;
OMX_BOOL empty;
} buffer_done;
} content;
};
struct _GstOMXPort {
GstOMXComponent *comp;
guint32 index;
/* Protects port_def, buffers, pending_buffers,
* settings_changed, flushing, flushed, enabled_changed
* and settings_cookie.
*
* Signalled if pending_buffers gets a
* new buffer or flushing/flushed is set
* to TRUE or the port is enabled/disabled
* or the settings change or an error happens.
*
* Note: Always check comp->last_error before
* waiting and after being signalled!
*
* Note: flushed==TRUE implies flushing==TRUE!
*
* Note: This lock must always be taken before
* the component's state lock if both are needed!
*/
GstOMXRecMutex port_lock;
GCond port_cond;
OMX_PARAM_PORTDEFINITIONTYPE port_def;
GPtrArray *buffers; /* Contains GstOMXBuffer* */
GQueue *pending_buffers; /* Contains GstOMXBuffer* */
GQueue pending_buffers; /* Contains GstOMXBuffer* */
/* If TRUE we need to get the new caps of this port */
gboolean settings_changed;
gboolean flushing;
@ -193,15 +211,20 @@ struct _GstOMXComponent {
guint64 hacks; /* Flags, GST_OMX_HACK_* */
/* Added once, never changed. No locks necessary */
GPtrArray *ports; /* Contains GstOMXPort* */
gint n_in_ports, n_out_ports;
/* Protecting state, pending_state, last_error,
* pending_reconfigure_outports.
* Signalled if one of them changes
*/
GstOMXRecMutex state_lock;
GCond state_cond;
/* Locking order: lock -> messages_lock
*
* Never hold lock while waiting for messages_cond
* Always check that messages is empty before waiting */
GMutex lock;
GQueue messages; /* Queue of GstOMXMessages */
GMutex messages_lock;
GCond messages_cond;
OMX_STATETYPE state;
/* OMX_StateInvalid if no pending state */
OMX_STATETYPE pending_state;

View file

@ -901,10 +901,12 @@ gst_omx_audio_enc_sink_event (GstAudioEncoder * encoder, GstEvent * event)
GST_WARNING_OBJECT (self, "Component does not support empty EOS buffers");
/* Insert a NULL into the queue to signal EOS */
gst_omx_rec_mutex_lock (&self->out_port->port_lock);
g_queue_push_tail (self->out_port->pending_buffers, NULL);
g_cond_broadcast (&self->out_port->port_cond);
gst_omx_rec_mutex_unlock (&self->out_port->port_lock);
g_mutex_lock (&self->component->lock);
g_queue_push_tail (&self->out_port->pending_buffers, NULL);
g_mutex_unlock (&self->component->lock);
g_mutex_lock (&self->component->messages_lock);
g_cond_broadcast (&self->component->messages_cond);
g_mutex_unlock (&self->component->messages_lock);
return TRUE;
}

View file

@ -1,144 +0,0 @@
/*
* Copyright (C) 2012, Collabora Ltd.
* Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation
* version 2.1 of the License.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstomxrecmutex.h"
void
gst_omx_rec_mutex_init (GstOMXRecMutex * mutex)
{
g_mutex_init (&mutex->lock);
g_mutex_init (&mutex->recursion_lock);
g_cond_init (&mutex->recursion_wait_cond);
g_atomic_int_set (&mutex->recursion_allowed, FALSE);
g_atomic_int_set (&mutex->recursion_pending, FALSE);
}
void
gst_omx_rec_mutex_clear (GstOMXRecMutex * mutex)
{
g_mutex_clear (&mutex->lock);
g_mutex_clear (&mutex->recursion_lock);
}
void
gst_omx_rec_mutex_lock (GstOMXRecMutex * mutex)
{
g_mutex_lock (&mutex->lock);
}
void
gst_omx_rec_mutex_unlock (GstOMXRecMutex * mutex)
{
g_mutex_unlock (&mutex->lock);
}
void
gst_omx_rec_mutex_lock_for_recursion (GstOMXRecMutex * mutex)
{
gboolean exchanged;
g_mutex_lock (&mutex->recursion_lock);
while (g_atomic_int_get (&mutex->recursion_allowed))
g_cond_wait (&mutex->recursion_wait_cond, &mutex->recursion_lock);
g_mutex_lock (&mutex->lock);
exchanged =
g_atomic_int_compare_and_exchange (&mutex->recursion_pending, FALSE,
TRUE);
g_assert (exchanged);
}
void
gst_omx_rec_mutex_unlock_for_recursion (GstOMXRecMutex * mutex)
{
gboolean exchanged;
exchanged =
g_atomic_int_compare_and_exchange (&mutex->recursion_pending, TRUE,
FALSE);
g_assert (exchanged);
g_mutex_unlock (&mutex->lock);
g_cond_broadcast (&mutex->recursion_wait_cond);
g_mutex_unlock (&mutex->recursion_lock);
}
/* must be called with mutex->lock taken */
void
gst_omx_rec_mutex_begin_recursion (GstOMXRecMutex * mutex)
{
gboolean exchanged;
exchanged =
g_atomic_int_compare_and_exchange (&mutex->recursion_allowed, FALSE,
TRUE);
g_assert (exchanged);
exchanged =
g_atomic_int_compare_and_exchange (&mutex->recursion_pending, TRUE,
FALSE);
g_assert (exchanged);
g_mutex_unlock (&mutex->recursion_lock);
}
/* must be called with mutex->lock taken */
void
gst_omx_rec_mutex_end_recursion (GstOMXRecMutex * mutex)
{
gboolean exchanged;
g_mutex_lock (&mutex->recursion_lock);
exchanged =
g_atomic_int_compare_and_exchange (&mutex->recursion_allowed, TRUE,
FALSE);
g_assert (exchanged);
exchanged =
g_atomic_int_compare_and_exchange (&mutex->recursion_pending, FALSE,
TRUE);
g_assert (exchanged);
}
void
gst_omx_rec_mutex_recursive_lock (GstOMXRecMutex * mutex)
{
g_mutex_lock (&mutex->recursion_lock);
if (!g_atomic_int_get (&mutex->recursion_allowed)) {
/* no recursion allowed, lock the proper mutex */
g_mutex_lock (&mutex->lock);
g_mutex_unlock (&mutex->recursion_lock);
}
}
void
gst_omx_rec_mutex_recursive_unlock (GstOMXRecMutex * mutex)
{
/* It is safe to check recursion_allowed here because
* we hold at least one of the two locks and
* either lock protects it from being changed.
*/
if (g_atomic_int_get (&mutex->recursion_allowed)) {
g_mutex_unlock (&mutex->recursion_lock);
} else {
g_mutex_unlock (&mutex->lock);
}
}

View file

@ -1,121 +0,0 @@
/*
* Copyright (C) 2012, Collabora Ltd.
* Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation
* version 2.1 of the License.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
*/
#ifndef __GST_OMX_REC_MUTEX_H__
#define __GST_OMX_REC_MUTEX_H__
#include <glib.h>
G_BEGIN_DECLS
/*
* This is a recursive mutex implementation that serves a very specific
* purpose; it is used to allow OpenMAX callbacks to be run in the context
* of some OpenMAX function call while the calling function is holding the
* master lock.
*
* According to the OpenMAX specification, we have 2 possible ways that
* callbacks may be called. First, we have out-of-context calls, which means
* that callbacks are called from a different thread at any point in time.
* In this case, callbacks must take the appropriate lock to protect the data
* that they are changing. Second, we have in-context calls, which means
* that callbacks are called when we call some OpenMAX function, before this
* function returns. In this case, we need to allow the callback to run
* without taking any locks that the caller of the OpenMAX function is holding.
*
* This can be solved by a recusrive mutex. However, a normal GRecMutex is
* not enough because it allows being locked multiple times only from
* the same thread. Unfortunatly, what we see in Broadcom's implementation,
* for instance, OpenMAX callbacks may be in-context, but from a different
* thread. This is achieved like this:
*
* - OMX_Foo is called
* - OMX_Foo waits on a condition
* - The callback is executed in a different thread
* - When the callback returns, its calling function
* signals the condition that OMX_Foo waits on
* - OMX_Foo wakes up and returns
*
* This recursive mutex implementation attempts to fix this problem.
* This is achieved like this: All functions lock this mutex normally
* using gst_omx_rec_mutex_lock() / _unlock(). These functions
* effectively lock the master mutex and they are identical in behavior
* with g_mutex_lock() / _unlock(). When a function that has already
* locked this mutex is about to call some OpenMAX function, it must
* call gst_omx_rec_mutex_begin_recursion() to indicate that recursive
* locking is now allowed, and similarly, call gst_omx_rec_mutex_end_recursion()
* after the OpenMAX function has returned to indicate that no recursive
* locking is allowed from this point on. Callbacks should lock the
* mutex using gst_omx_rec_mutex_recursive_lock() / _recursive_unlock().
* These two functions, depending on whether recursion is allowed
* will take/release either the master lock or the recursion_lock.
* Effectively, this allows callbacks to run in the context any code between
* calls to gst_omx_rec_mutex_begin_recursion() / _end_recursion().
*
* Note that this doesn't prevent out-of-context callback executions
* to be run at that point, but due to the fact that _end_recursion()
* also locks the recursion_lock, it is at least guaranteed that they
* will have finished their execution before _end_recursion() returns.
*/
typedef struct _GstOMXRecMutex GstOMXRecMutex;
struct _GstOMXRecMutex {
/* The master lock */
GMutex lock;
/* This lock is taken when recursing.
* The master lock must always be taken before this one,
* by the caller of _begin_recursion().
*/
GMutex recursion_lock;
/* Indicates whether recursion is allowed.
* When it is allowed, _recursive_lock() takes
* the recursion_lock instead of the master lock.
* This variable is protected by both locks.
*/
volatile gint recursion_allowed;
/* Indicates whether lock is locked and recursion
* will be allowed soon
*/
volatile gint recursion_pending;
GCond recursion_wait_cond;
};
void gst_omx_rec_mutex_init (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_clear (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_lock (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_unlock (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_lock_for_recursion (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_unlock_for_recursion (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_begin_recursion (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_end_recursion (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_recursive_lock (GstOMXRecMutex * mutex);
void gst_omx_rec_mutex_recursive_unlock (GstOMXRecMutex * mutex);
G_END_DECLS
#endif /* __GST_OMX_REC_MUTEX_H__ */