Use a GstTask with start/stop semantics for the decoder thread.

This commit is contained in:
gb 2010-04-29 17:11:32 +00:00 committed by Gwenole Beauchesne
parent 5f5ed724dd
commit 2356ceb0d9
3 changed files with 59 additions and 53 deletions

View file

@ -52,11 +52,8 @@ enum {
PROP_CODEC_DATA PROP_CODEC_DATA
}; };
static gboolean /* Wait _at most_ 10 ms for encoded buffers between each decoding step */
gst_vaapi_decoder_start(GstVaapiDecoder *decoder); #define GST_VAAPI_DECODER_TIMEOUT (10000)
static gboolean
gst_vaapi_decoder_stop(GstVaapiDecoder *decoder);
static GstBuffer * static GstBuffer *
pop_buffer(GstVaapiDecoder *decoder); pop_buffer(GstVaapiDecoder *decoder);
@ -67,33 +64,31 @@ push_surface(GstVaapiDecoder *decoder, GstVaapiSurface *surface);
static DecodedSurface * static DecodedSurface *
pop_surface(GstVaapiDecoder *decoder, GTimeVal *end_time); pop_surface(GstVaapiDecoder *decoder, GTimeVal *end_time);
static gpointer static void
decoder_thread_cb(gpointer data) decoder_task(gpointer data)
{ {
GstVaapiDecoder * const decoder = data; GstVaapiDecoder * const decoder = GST_VAAPI_DECODER_CAST(data);
GstVaapiDecoderPrivate * const priv = decoder->priv; GstVaapiDecoderPrivate * const priv = decoder->priv;
GstVaapiDecoderClass * const klass = GST_VAAPI_DECODER_GET_CLASS(decoder); GstVaapiDecoderClass * const klass = GST_VAAPI_DECODER_GET_CLASS(decoder);
GstBuffer *buffer; GstBuffer *buffer;
g_object_ref(decoder); buffer = pop_buffer(decoder);
while (!priv->decoder_thread_cancel) { if (!buffer)
buffer = pop_buffer(decoder); return;
priv->decoder_status = klass->decode(decoder, buffer);
GST_DEBUG("decode frame (status = %d)", priv->decoder_status); priv->decoder_status = klass->decode(decoder, buffer);
switch (priv->decoder_status) { GST_DEBUG("decode frame (status = %d)", priv->decoder_status);
case GST_VAAPI_DECODER_STATUS_SUCCESS:
case GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA: switch (priv->decoder_status) {
break; case GST_VAAPI_DECODER_STATUS_SUCCESS:
default: case GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA:
/* Send an empty surface to signal an error */ break;
push_surface(decoder, NULL); default:
priv->decoder_thread_cancel = TRUE; /* Send an empty surface to signal an error */
break; push_surface(decoder, NULL);
} gst_task_pause(priv->decoder_task);
gst_buffer_unref(buffer); break;
} }
g_object_unref(decoder);
return NULL;
} }
static void static void
@ -182,9 +177,6 @@ push_buffer(GstVaapiDecoder *decoder, GstBuffer *buffer)
GST_DEBUG("queue encoded data buffer %p (%d bytes)", GST_DEBUG("queue encoded data buffer %p (%d bytes)",
buffer, GST_BUFFER_SIZE(buffer)); buffer, GST_BUFFER_SIZE(buffer));
if (!priv->decoder_thread && !gst_vaapi_decoder_start(decoder))
return FALSE;
g_async_queue_push(priv->buffers, buffer); g_async_queue_push(priv->buffers, buffer);
return TRUE; return TRUE;
} }
@ -193,10 +185,15 @@ static GstBuffer *
pop_buffer(GstVaapiDecoder *decoder) pop_buffer(GstVaapiDecoder *decoder)
{ {
GstVaapiDecoderPrivate * const priv = decoder->priv; GstVaapiDecoderPrivate * const priv = decoder->priv;
GTimeVal end_time;
GstBuffer *buffer; GstBuffer *buffer;
buffer = g_async_queue_pop(priv->buffers); g_get_current_time(&end_time);
g_return_val_if_fail(buffer, NULL); g_time_val_add(&end_time, GST_VAAPI_DECODER_TIMEOUT);
buffer = g_async_queue_timed_pop(priv->buffers, &end_time);
if (!buffer)
return NULL;
GST_DEBUG("dequeue buffer %p for decoding (%d bytes)", GST_DEBUG("dequeue buffer %p for decoding (%d bytes)",
buffer, GST_BUFFER_SIZE(buffer)); buffer, GST_BUFFER_SIZE(buffer));
@ -251,6 +248,9 @@ pop_surface(GstVaapiDecoder *decoder, GTimeVal *end_time)
{ {
GstVaapiDecoderPrivate * const priv = decoder->priv; GstVaapiDecoderPrivate * const priv = decoder->priv;
if (!gst_vaapi_decoder_start(decoder))
return NULL;
return g_async_queue_timed_pop(priv->surfaces, end_time); return g_async_queue_timed_pop(priv->surfaces, end_time);
} }
@ -423,8 +423,9 @@ gst_vaapi_decoder_init(GstVaapiDecoder *decoder)
priv->surface_duration = GST_CLOCK_TIME_NONE; priv->surface_duration = GST_CLOCK_TIME_NONE;
priv->buffers = g_async_queue_new(); priv->buffers = g_async_queue_new();
priv->surfaces = g_async_queue_new(); priv->surfaces = g_async_queue_new();
priv->decoder_thread = NULL; priv->decoder_task = NULL;
priv->decoder_thread_cancel = FALSE;
g_static_rec_mutex_init(&priv->decoder_task_lock);
} }
/** /**
@ -442,16 +443,15 @@ gst_vaapi_decoder_start(GstVaapiDecoder *decoder)
/* This is an internal function */ /* This is an internal function */
GstVaapiDecoderPrivate * const priv = decoder->priv; GstVaapiDecoderPrivate * const priv = decoder->priv;
if (!priv->decoder_thread) { if (priv->decoder_task)
priv->decoder_thread = g_thread_create( return TRUE;
decoder_thread_cb, decoder,
TRUE, priv->decoder_task = gst_task_create(decoder_task, decoder);
NULL if (!priv->decoder_task)
); return FALSE;
if (!priv->decoder_thread)
return FALSE; gst_task_set_lock(priv->decoder_task, &priv->decoder_task_lock);
} return gst_task_start(priv->decoder_task);
return TRUE;
} }
/** /**
@ -470,14 +470,14 @@ gst_vaapi_decoder_stop(GstVaapiDecoder *decoder)
{ {
/* This is an internal function */ /* This is an internal function */
GstVaapiDecoderPrivate * const priv = decoder->priv; GstVaapiDecoderPrivate * const priv = decoder->priv;
gboolean success;
if (priv->decoder_thread) { if (!priv->decoder_task)
push_buffer(decoder, NULL); return FALSE;
priv->decoder_thread_cancel = TRUE;
g_thread_join(priv->decoder_thread); success = gst_task_join(priv->decoder_task);
priv->decoder_thread = NULL; priv->decoder_task = NULL;
} return success;
return TRUE;
} }
/** /**

View file

@ -124,6 +124,12 @@ gst_vaapi_decoder_set_frame_rate(
guint den guint den
); );
gboolean
gst_vaapi_decoder_start(GstVaapiDecoder *decoder);
gboolean
gst_vaapi_decoder_stop(GstVaapiDecoder *decoder);
gboolean gboolean
gst_vaapi_decoder_put_buffer_data( gst_vaapi_decoder_put_buffer_data(
GstVaapiDecoder *decoder, GstVaapiDecoder *decoder,

View file

@ -22,7 +22,7 @@
#define GST_VAAPI_DECODER_PRIV_H #define GST_VAAPI_DECODER_PRIV_H
#include <glib.h> #include <glib.h>
#include <gst/base/gstadapter.h> #include <gst/gsttask.h>
#include <gst/vaapi/gstvaapidecoder.h> #include <gst/vaapi/gstvaapidecoder.h>
#include <gst/vaapi/gstvaapicontext.h> #include <gst/vaapi/gstvaapicontext.h>
@ -97,9 +97,9 @@ struct _GstVaapiDecoderPrivate {
GAsyncQueue *surfaces; GAsyncQueue *surfaces;
GstClockTime surface_timestamp; GstClockTime surface_timestamp;
GstClockTime surface_duration; GstClockTime surface_duration;
GThread *decoder_thread; GstTask *decoder_task;
GStaticRecMutex decoder_task_lock;
GstVaapiDecoderStatus decoder_status; GstVaapiDecoderStatus decoder_status;
guint decoder_thread_cancel : 1;
}; };
gboolean gboolean