diff --git a/gst-libs/gst/vaapi/gstvaapidecoder.c b/gst-libs/gst/vaapi/gstvaapidecoder.c index 97657fc8f0..568212bacd 100644 --- a/gst-libs/gst/vaapi/gstvaapidecoder.c +++ b/gst-libs/gst/vaapi/gstvaapidecoder.c @@ -52,11 +52,8 @@ enum { PROP_CODEC_DATA }; -static gboolean -gst_vaapi_decoder_start(GstVaapiDecoder *decoder); - -static gboolean -gst_vaapi_decoder_stop(GstVaapiDecoder *decoder); +/* Wait _at most_ 10 ms for encoded buffers between each decoding step */ +#define GST_VAAPI_DECODER_TIMEOUT (10000) static GstBuffer * pop_buffer(GstVaapiDecoder *decoder); @@ -67,33 +64,31 @@ push_surface(GstVaapiDecoder *decoder, GstVaapiSurface *surface); static DecodedSurface * pop_surface(GstVaapiDecoder *decoder, GTimeVal *end_time); -static gpointer -decoder_thread_cb(gpointer data) +static void +decoder_task(gpointer data) { - GstVaapiDecoder * const decoder = data; + GstVaapiDecoder * const decoder = GST_VAAPI_DECODER_CAST(data); GstVaapiDecoderPrivate * const priv = decoder->priv; GstVaapiDecoderClass * const klass = GST_VAAPI_DECODER_GET_CLASS(decoder); GstBuffer *buffer; - g_object_ref(decoder); - while (!priv->decoder_thread_cancel) { - buffer = pop_buffer(decoder); - priv->decoder_status = klass->decode(decoder, buffer); - GST_DEBUG("decode frame (status = %d)", priv->decoder_status); - switch (priv->decoder_status) { - case GST_VAAPI_DECODER_STATUS_SUCCESS: - case GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA: - break; - default: - /* Send an empty surface to signal an error */ - push_surface(decoder, NULL); - priv->decoder_thread_cancel = TRUE; - break; - } - gst_buffer_unref(buffer); + buffer = pop_buffer(decoder); + if (!buffer) + return; + + priv->decoder_status = klass->decode(decoder, buffer); + GST_DEBUG("decode frame (status = %d)", priv->decoder_status); + + switch (priv->decoder_status) { + case GST_VAAPI_DECODER_STATUS_SUCCESS: + case GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA: + break; + default: + /* Send an empty surface to signal an error */ + push_surface(decoder, NULL); + gst_task_pause(priv->decoder_task); + break; } - g_object_unref(decoder); - return NULL; } static void @@ -182,9 +177,6 @@ push_buffer(GstVaapiDecoder *decoder, GstBuffer *buffer) GST_DEBUG("queue encoded data buffer %p (%d bytes)", buffer, GST_BUFFER_SIZE(buffer)); - if (!priv->decoder_thread && !gst_vaapi_decoder_start(decoder)) - return FALSE; - g_async_queue_push(priv->buffers, buffer); return TRUE; } @@ -193,10 +185,15 @@ static GstBuffer * pop_buffer(GstVaapiDecoder *decoder) { GstVaapiDecoderPrivate * const priv = decoder->priv; + GTimeVal end_time; GstBuffer *buffer; - buffer = g_async_queue_pop(priv->buffers); - g_return_val_if_fail(buffer, NULL); + g_get_current_time(&end_time); + g_time_val_add(&end_time, GST_VAAPI_DECODER_TIMEOUT); + + buffer = g_async_queue_timed_pop(priv->buffers, &end_time); + if (!buffer) + return NULL; GST_DEBUG("dequeue buffer %p for decoding (%d bytes)", buffer, GST_BUFFER_SIZE(buffer)); @@ -251,6 +248,9 @@ pop_surface(GstVaapiDecoder *decoder, GTimeVal *end_time) { GstVaapiDecoderPrivate * const priv = decoder->priv; + if (!gst_vaapi_decoder_start(decoder)) + return NULL; + return g_async_queue_timed_pop(priv->surfaces, end_time); } @@ -423,8 +423,9 @@ gst_vaapi_decoder_init(GstVaapiDecoder *decoder) priv->surface_duration = GST_CLOCK_TIME_NONE; priv->buffers = g_async_queue_new(); priv->surfaces = g_async_queue_new(); - priv->decoder_thread = NULL; - priv->decoder_thread_cancel = FALSE; + priv->decoder_task = NULL; + + g_static_rec_mutex_init(&priv->decoder_task_lock); } /** @@ -442,16 +443,15 @@ gst_vaapi_decoder_start(GstVaapiDecoder *decoder) /* This is an internal function */ GstVaapiDecoderPrivate * const priv = decoder->priv; - if (!priv->decoder_thread) { - priv->decoder_thread = g_thread_create( - decoder_thread_cb, decoder, - TRUE, - NULL - ); - if (!priv->decoder_thread) - return FALSE; - } - return TRUE; + if (priv->decoder_task) + return TRUE; + + priv->decoder_task = gst_task_create(decoder_task, decoder); + if (!priv->decoder_task) + return FALSE; + + gst_task_set_lock(priv->decoder_task, &priv->decoder_task_lock); + return gst_task_start(priv->decoder_task); } /** @@ -470,14 +470,14 @@ gst_vaapi_decoder_stop(GstVaapiDecoder *decoder) { /* This is an internal function */ GstVaapiDecoderPrivate * const priv = decoder->priv; + gboolean success; - if (priv->decoder_thread) { - push_buffer(decoder, NULL); - priv->decoder_thread_cancel = TRUE; - g_thread_join(priv->decoder_thread); - priv->decoder_thread = NULL; - } - return TRUE; + if (!priv->decoder_task) + return FALSE; + + success = gst_task_join(priv->decoder_task); + priv->decoder_task = NULL; + return success; } /** diff --git a/gst-libs/gst/vaapi/gstvaapidecoder.h b/gst-libs/gst/vaapi/gstvaapidecoder.h index b11cf00d13..f526cde980 100644 --- a/gst-libs/gst/vaapi/gstvaapidecoder.h +++ b/gst-libs/gst/vaapi/gstvaapidecoder.h @@ -124,6 +124,12 @@ gst_vaapi_decoder_set_frame_rate( guint den ); +gboolean +gst_vaapi_decoder_start(GstVaapiDecoder *decoder); + +gboolean +gst_vaapi_decoder_stop(GstVaapiDecoder *decoder); + gboolean gst_vaapi_decoder_put_buffer_data( GstVaapiDecoder *decoder, diff --git a/gst-libs/gst/vaapi/gstvaapidecoder_priv.h b/gst-libs/gst/vaapi/gstvaapidecoder_priv.h index be28904308..a27199561a 100644 --- a/gst-libs/gst/vaapi/gstvaapidecoder_priv.h +++ b/gst-libs/gst/vaapi/gstvaapidecoder_priv.h @@ -22,7 +22,7 @@ #define GST_VAAPI_DECODER_PRIV_H #include -#include +#include #include #include @@ -97,9 +97,9 @@ struct _GstVaapiDecoderPrivate { GAsyncQueue *surfaces; GstClockTime surface_timestamp; GstClockTime surface_duration; - GThread *decoder_thread; + GstTask *decoder_task; + GStaticRecMutex decoder_task_lock; GstVaapiDecoderStatus decoder_status; - guint decoder_thread_cancel : 1; }; gboolean