examples: simplify the thread synchronization code

Make everithing more simple and fix the races conditions remaining in
the previous approaches.
This commit is contained in:
Josep Torra 2013-09-28 13:32:37 +02:00
parent b129376a57
commit 244320fe48

View file

@ -89,6 +89,8 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#define M_PI 3.141592654
#endif
#define SYNC_BUFFERS TRUE
#define TRACE_VC_MEMORY_ENABLED 0
#if TRACE_VC_MEMORY_ENABLED
@ -168,7 +170,6 @@ typedef struct
/* Interthread comunication */
GAsyncQueue *queue;
GMutex *queue_lock;
GMutex *flow_lock;
GCond *cond;
gboolean flushing;
GstMiniObject *popped_obj;
@ -976,7 +977,6 @@ init_intercom (APP_STATE_T * state)
state->queue =
g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
state->queue_lock = g_mutex_new ();
state->flow_lock = g_mutex_new ();
state->cond = g_cond_new ();
}
@ -992,10 +992,6 @@ terminate_intercom (APP_STATE_T * state)
g_mutex_free (state->queue_lock);
}
if (state->flow_lock) {
g_mutex_free (state->flow_lock);
}
if (state->cond) {
g_cond_free (state->cond);
}
@ -1020,7 +1016,6 @@ flush_start (APP_STATE_T * state)
g_cond_broadcast (state->cond);
g_mutex_unlock (state->queue_lock);
g_mutex_lock (state->flow_lock);
while ((object = g_async_queue_try_pop (state->queue))) {
gst_mini_object_unref (object);
}
@ -1028,7 +1023,6 @@ flush_start (APP_STATE_T * state)
flush_internal (state);
state->popped_obj = NULL;
g_mutex_unlock (state->queue_lock);
g_mutex_unlock (state->flow_lock);
}
static void
@ -1100,32 +1094,28 @@ static gboolean
handle_queued_objects (APP_STATE_T * state)
{
GstMiniObject *object = NULL;
gboolean done = FALSE;
g_mutex_lock (state->queue_lock);
if (state->flushing) {
g_cond_broadcast (state->cond);
done = TRUE;
goto beach;
} else if (g_async_queue_length (state->queue) == 0) {
done = TRUE;
goto beach;
}
g_mutex_unlock (state->queue_lock);
while (!done && (object = g_async_queue_try_pop (state->queue))) {
if ((object = g_async_queue_try_pop (state->queue))) {
if (GST_IS_BUFFER (object)) {
GstBuffer *buffer = GST_BUFFER_CAST (object);
g_mutex_lock (state->queue_lock);
update_image (state, buffer);
render_scene (state);
gst_buffer_unref (buffer);
g_mutex_unlock (state->queue_lock);
if (!SYNC_BUFFERS) {
object = NULL;
}
} else if (GST_IS_QUERY (object)) {
GstQuery *query = GST_QUERY_CAST (object);
GstStructure *s = (GstStructure *) gst_query_get_structure (query);
g_mutex_lock (state->queue_lock);
if (gst_structure_has_name (s, "eglglessink-allocate-eglimage")) {
GstBuffer *buffer;
GstVideoFormat format;
@ -1151,18 +1141,11 @@ handle_queued_objects (APP_STATE_T * state)
} else {
g_assert_not_reached ();
}
state->popped_obj = object;
g_cond_broadcast (state->cond);
g_mutex_unlock (state->queue_lock);
return TRUE;
} else if (GST_IS_EVENT (object)) {
GstEvent *event = GST_EVENT_CAST (object);
g_print ("\nevent %p %s\n", event,
gst_event_type_get_name (GST_EVENT_TYPE (event)));
g_mutex_lock (state->queue_lock);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
flush_internal (state);
@ -1170,18 +1153,19 @@ handle_queued_objects (APP_STATE_T * state)
default:
break;
}
g_mutex_unlock (state->queue_lock);
gst_event_unref (event);
object = NULL;
}
}
g_mutex_lock (state->queue_lock);
if (object) {
state->popped_obj = object;
g_cond_broadcast (state->cond);
g_mutex_unlock (state->queue_lock);
g_mutex_lock (state->flow_lock);
g_mutex_unlock (state->flow_lock);
}
beach:
g_mutex_unlock (state->queue_lock);
return FALSE;
}
@ -1190,14 +1174,13 @@ queue_object (APP_STATE_T * state, GstMiniObject * obj, gboolean synchronous)
{
gboolean res = TRUE;
g_mutex_lock (state->flow_lock);
g_mutex_lock (state->queue_lock);
if (state->flushing) {
gst_mini_object_unref (obj);
res = FALSE;
goto beach;
}
g_mutex_lock (state->queue_lock);
g_async_queue_push (state->queue, obj);
if (synchronous) {
@ -1206,10 +1189,9 @@ queue_object (APP_STATE_T * state, GstMiniObject * obj, gboolean synchronous)
g_cond_wait (state->cond, state->queue_lock);
} while (!state->flushing && state->popped_obj != obj);
}
g_mutex_unlock (state->queue_lock);
beach:
g_mutex_unlock (state->flow_lock);
g_mutex_unlock (state->queue_lock);
return res;
}
@ -1226,7 +1208,8 @@ buffers_cb (GstElement * fakesink, GstBuffer * buffer, GstPad * pad,
gpointer user_data)
{
APP_STATE_T *state = (APP_STATE_T *) user_data;
queue_object (state, GST_MINI_OBJECT_CAST (gst_buffer_ref (buffer)), TRUE);
queue_object (state, GST_MINI_OBJECT_CAST (gst_buffer_ref (buffer)),
SYNC_BUFFERS);
}
static GstPadProbeReturn