Drop excessive threading that over-complicates synchronisation.

MPEG-2 & H.264 videos now play but there are other problems (timestamps).
This commit is contained in:
gb 2010-04-30 15:37:28 +00:00 committed by Gwenole Beauchesne
parent 4c7c0307c4
commit a4d201aaf9
6 changed files with 102 additions and 331 deletions

View file

@ -52,45 +52,6 @@ enum {
PROP_CODEC_DATA
};
/* Wait _at most_ 10 ms for encoded buffers between each decoding step */
#define GST_VAAPI_DECODER_TIMEOUT (10000)
static GstBuffer *
pop_buffer(GstVaapiDecoder *decoder);
static gboolean
push_surface(GstVaapiDecoder *decoder, GstVaapiSurface *surface);
static DecodedSurface *
pop_surface(GstVaapiDecoder *decoder, GTimeVal *end_time);
static void
decoder_task(gpointer data)
{
GstVaapiDecoder * const decoder = GST_VAAPI_DECODER_CAST(data);
GstVaapiDecoderPrivate * const priv = decoder->priv;
GstVaapiDecoderClass * const klass = GST_VAAPI_DECODER_GET_CLASS(decoder);
GstBuffer *buffer;
buffer = pop_buffer(decoder);
if (!buffer)
return;
priv->decoder_status = klass->decode(decoder, buffer);
GST_DEBUG("decode frame (status = %d)", priv->decoder_status);
switch (priv->decoder_status) {
case GST_VAAPI_DECODER_STATUS_SUCCESS:
case GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA:
break;
default:
/* Send an empty surface to signal an error */
push_surface(decoder, NULL);
gst_task_pause(priv->decoder_task);
break;
}
}
static void
update_clock(GstVaapiDecoder *decoder, GstBuffer *buffer)
{
@ -177,7 +138,7 @@ push_buffer(GstVaapiDecoder *decoder, GstBuffer *buffer)
GST_DEBUG("queue encoded data buffer %p (%d bytes)",
buffer, GST_BUFFER_SIZE(buffer));
g_async_queue_push(priv->buffers, buffer);
g_queue_push_tail(priv->buffers, buffer);
return TRUE;
}
@ -185,13 +146,9 @@ static GstBuffer *
pop_buffer(GstVaapiDecoder *decoder)
{
GstVaapiDecoderPrivate * const priv = decoder->priv;
GTimeVal end_time;
GstBuffer *buffer;
g_get_current_time(&end_time);
g_time_val_add(&end_time, GST_VAAPI_DECODER_TIMEOUT);
buffer = g_async_queue_timed_pop(priv->buffers, &end_time);
buffer = g_queue_pop_head(priv->buffers);
if (!buffer)
return NULL;
@ -202,6 +159,28 @@ pop_buffer(GstVaapiDecoder *decoder)
return buffer;
}
static GstVaapiDecoderStatus
decode_step(GstVaapiDecoder *decoder)
{
GstVaapiDecoderStatus status;
GstBuffer *buffer;
do {
buffer = pop_buffer(decoder);
if (!buffer)
return GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA;
status = GST_VAAPI_DECODER_GET_CLASS(decoder)->decode(decoder, buffer);
GST_DEBUG("decode frame (status = %d)", status);
if (status == GST_VAAPI_DECODER_STATUS_SUCCESS)
return status;
if (GST_BUFFER_IS_EOS(buffer))
return GST_VAAPI_DECODER_STATUS_END_OF_STREAM;
} while (status == GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA);
return status;
}
static inline DecodedSurface *
create_surface(void)
{
@ -224,34 +203,26 @@ push_surface(GstVaapiDecoder *decoder, GstVaapiSurface *surface)
if (!ds)
return FALSE;
if (surface) {
GST_DEBUG("queue decoded surface %" GST_VAAPI_ID_FORMAT,
GST_VAAPI_ID_ARGS(GST_VAAPI_OBJECT_ID(surface)));
ds->proxy = gst_vaapi_surface_proxy_new(priv->context, surface);
if (ds->proxy) {
ds->status = GST_VAAPI_DECODER_STATUS_SUCCESS;
gst_vaapi_surface_proxy_set_timestamp(
ds->proxy, priv->surface_timestamp);
}
else
ds->status = GST_VAAPI_DECODER_STATUS_ERROR_ALLOCATION_FAILED;
GST_DEBUG("queue decoded surface %" GST_VAAPI_ID_FORMAT,
GST_VAAPI_ID_ARGS(GST_VAAPI_OBJECT_ID(surface)));
ds->proxy = gst_vaapi_surface_proxy_new(priv->context, surface);
if (ds->proxy) {
ds->status = GST_VAAPI_DECODER_STATUS_SUCCESS;
gst_vaapi_surface_proxy_set_timestamp(ds->proxy, priv->surface_timestamp);
}
else
ds->status = priv->decoder_status;
ds->status = GST_VAAPI_DECODER_STATUS_ERROR_ALLOCATION_FAILED;
g_async_queue_push(priv->surfaces, ds);
g_queue_push_tail(priv->surfaces, ds);
return TRUE;
}
static inline DecodedSurface *
pop_surface(GstVaapiDecoder *decoder, GTimeVal *end_time)
pop_surface(GstVaapiDecoder *decoder)
{
GstVaapiDecoderPrivate * const priv = decoder->priv;
if (!gst_vaapi_decoder_start(decoder))
return NULL;
return g_async_queue_timed_pop(priv->surfaces, end_time);
return g_queue_pop_head(priv->surfaces);
}
static inline void
@ -269,12 +240,10 @@ set_codec_data(GstVaapiDecoder *decoder, GstBuffer *codec_data)
}
static void
clear_async_queue(GAsyncQueue *q, GDestroyNotify destroy)
clear_queue(GQueue *q, GDestroyNotify destroy)
{
guint i, qlen = g_async_queue_length(q);
for (i = 0; i < qlen; i++)
destroy(g_async_queue_pop(q));
while (!g_queue_is_empty(q))
destroy(g_queue_pop_head(q));
}
static void
@ -283,8 +252,6 @@ gst_vaapi_decoder_finalize(GObject *object)
GstVaapiDecoder * const decoder = GST_VAAPI_DECODER(object);
GstVaapiDecoderPrivate * const priv = decoder->priv;
gst_vaapi_decoder_stop(decoder);
set_codec_data(decoder, NULL);
if (priv->context) {
@ -293,14 +260,14 @@ gst_vaapi_decoder_finalize(GObject *object)
}
if (priv->buffers) {
clear_async_queue(priv->buffers, (GDestroyNotify)gst_buffer_unref);
g_async_queue_unref(priv->buffers);
clear_queue(priv->buffers, (GDestroyNotify)gst_buffer_unref);
g_queue_free(priv->buffers);
priv->buffers = NULL;
}
if (priv->surfaces) {
clear_async_queue(priv->surfaces, (GDestroyNotify)destroy_surface);
g_async_queue_unref(priv->surfaces);
clear_queue(priv->surfaces, (GDestroyNotify)destroy_surface);
g_queue_free(priv->surfaces);
priv->surfaces = NULL;
}
@ -421,81 +388,8 @@ gst_vaapi_decoder_init(GstVaapiDecoder *decoder)
priv->fps_d = 30;
priv->surface_timestamp = GST_CLOCK_TIME_NONE;
priv->surface_duration = GST_CLOCK_TIME_NONE;
priv->buffers = g_async_queue_new();
priv->surfaces = g_async_queue_new();
priv->decoder_task = NULL;
g_static_rec_mutex_init(&priv->decoder_task_lock);
}
/**
* gst_vaapi_decoder_start:
* @decoder: a #GstVaapiDecoder
*
* Starts the decoder. This creates the internal decoder thread, if
* necessary.
*
* Return value: %TRUE on success
*/
gboolean
gst_vaapi_decoder_start(GstVaapiDecoder *decoder)
{
g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), FALSE);
if (decoder->priv->decoder_task)
return TRUE;
decoder->priv->decoder_task = gst_task_create(decoder_task, decoder);
if (!decoder->priv->decoder_task)
return FALSE;
gst_task_set_lock(decoder->priv->decoder_task, &decoder->priv->decoder_task_lock);
return gst_task_start(decoder->priv->decoder_task);
}
/**
* gst_vaapi_decoder_pause:
* @decoder: a #GstVaapiDecoder
*
* Pauses the decoder. It can be made active again through
* gst_vaapi_decoder_start() or definitely stopped through
* gst_vaapi_decoder_stop().
*
* Return value: %TRUE on success
*/
gboolean
gst_vaapi_decoder_pause(GstVaapiDecoder *decoder)
{
g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), FALSE);
return gst_task_pause(decoder->priv->decoder_task);
}
/**
* gst_vaapi_decoder_stop:
* @decoder: a #GstVaapiDecoder
*
* Stops the decoder. This destroys any decoding thread that was
* previously created by gst_vaapi_decoder_start(). Only
* gst_vaapi_decoder_get_surface() on the queued surfaces will be
* allowed at this point.
*
* Return value: %FALSE on success
*/
gboolean
gst_vaapi_decoder_stop(GstVaapiDecoder *decoder)
{
gboolean success;
g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), FALSE);
if (!decoder->priv->decoder_task)
return FALSE;
success = gst_task_join(decoder->priv->decoder_task);
g_object_unref(decoder->priv->decoder_task);
decoder->priv->decoder_task = NULL;
return success;
priv->buffers = g_queue_new();
priv->surfaces = g_queue_new();
}
/**
@ -625,82 +519,42 @@ gst_vaapi_decoder_put_buffer(GstVaapiDecoder *decoder, GstBuffer *buf)
* @decoder: a #GstVaapiDecoder
* @pstatus: return location for the decoder status, or %NULL
*
* Waits for a decoded surface to arrive. This functions blocks until
* the @decoder has a surface ready for the caller. @pstatus is
* optional but it can help to know what went wrong during the
* decoding process.
* Flushes encoded buffers to the decoder and returns a decoded
* surface, if any.
*
* Return value: a #GstVaapiSurfaceProxy holding the decoded surface,
* or %NULL if none is available (e.g. an error). Caller owns the
* returned object. g_object_unref() after usage.
*/
static GstVaapiSurfaceProxy *
_gst_vaapi_decoder_get_surface(
GstVaapiDecoder *decoder,
GTimeVal *end_time,
GstVaapiDecoderStatus *pstatus
)
{
GstVaapiDecoderStatus status;
GstVaapiSurfaceProxy *proxy;
DecodedSurface *ds;
ds = pop_surface(decoder, end_time);
if (ds) {
proxy = ds->proxy;
status = ds->status;
destroy_surface(ds);
}
else {
proxy = NULL;
status = GST_VAAPI_DECODER_STATUS_TIMEOUT;
}
if (pstatus)
*pstatus = status;
return proxy;
}
GstVaapiSurfaceProxy *
gst_vaapi_decoder_get_surface(
GstVaapiDecoder *decoder,
GstVaapiDecoderStatus *pstatus
)
{
g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), NULL);
return _gst_vaapi_decoder_get_surface(decoder, NULL, pstatus);
}
/**
* gst_vaapi_decoder_timed_get_surface:
* @decoder: a #GstVaapiDecoder
* @timeout: the number of microseconds to wait for the decoded surface
* @pstatus: return location for the decoder status, or %NULL
*
* Waits for a decoded surface to arrive. This function blocks for at
* least @timeout microseconds. @pstatus is optional but it can help
* to know what went wrong during the decoding process.
*
* Return value: a #GstVaapiSurfaceProxy holding the decoded surface,
* or %NULL if none is available (e.g. an error). Caller owns the
* returned object. g_object_unref() after usage.
*/
GstVaapiSurfaceProxy *
gst_vaapi_decoder_timed_get_surface(
GstVaapiDecoder *decoder,
guint32 timeout,
GstVaapiDecoderStatus *pstatus
)
{
GTimeVal end_time;
GstVaapiSurfaceProxy *proxy = NULL;
GstVaapiDecoderStatus status = GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA;
DecodedSurface *ds;
g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), NULL);
g_get_current_time(&end_time);
g_time_val_add(&end_time, timeout);
ds = pop_surface(decoder);
if (!ds) {
do {
status = decode_step(decoder);
} while (status == GST_VAAPI_DECODER_STATUS_SUCCESS);
ds = pop_surface(decoder);
}
return _gst_vaapi_decoder_get_surface(decoder, &end_time, pstatus);
if (ds) {
proxy = ds->proxy;
status = ds->status;
destroy_surface(ds);
}
if (pstatus)
*pstatus = status;
return proxy;
}
gboolean

View file

@ -156,13 +156,6 @@ gst_vaapi_decoder_get_surface(
GstVaapiDecoderStatus *pstatus
);
GstVaapiSurfaceProxy *
gst_vaapi_decoder_timed_get_surface(
GstVaapiDecoder *decoder,
guint32 timeout,
GstVaapiDecoderStatus *pstatus
);
G_END_DECLS
#endif /* GST_VAAPI_DECODER_H */

View file

@ -87,19 +87,16 @@ G_BEGIN_DECLS
GstVaapiDecoderPrivate))
struct _GstVaapiDecoderPrivate {
GstVaapiDisplay *display;
GstVaapiContext *context;
GstVaapiCodec codec;
GstBuffer *codec_data;
guint fps_n;
guint fps_d;
GAsyncQueue *buffers;
GAsyncQueue *surfaces;
GstClockTime surface_timestamp;
GstClockTime surface_duration;
GstTask *decoder_task;
GStaticRecMutex decoder_task_lock;
GstVaapiDecoderStatus decoder_status;
GstVaapiDisplay *display;
GstVaapiContext *context;
GstVaapiCodec codec;
GstBuffer *codec_data;
guint fps_n;
guint fps_d;
GQueue *buffers;
GQueue *surfaces;
GstClockTime surface_timestamp;
GstClockTime surface_duration;
};
gboolean

View file

@ -91,18 +91,21 @@ enum {
PROP_USE_FFMPEG,
};
static void
gst_vaapidecode_task_cb(gpointer data)
static GstFlowReturn
gst_vaapidecode_step(GstVaapiDecode *decode)
{
GstVaapiDecode * const decode = GST_VAAPIDECODE(data);
GstVaapiDecoderStatus status;
GstVaapiSurfaceProxy *proxy;
GstVaapiDecoderStatus status;
GstBuffer *buffer;
GstFlowReturn ret;
proxy = gst_vaapi_decoder_timed_get_surface(decode->decoder, 10000, &status);
if (!proxy || status != GST_VAAPI_DECODER_STATUS_SUCCESS)
return;
proxy = gst_vaapi_decoder_get_surface(decode->decoder, &status);
if (!proxy) {
if (status != GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA)
goto error_decode;
/* More data is needed */
return GST_FLOW_OK;
}
buffer = NULL;
ret = gst_pad_alloc_buffer(
@ -125,9 +128,14 @@ gst_vaapidecode_task_cb(gpointer data)
goto error_commit_buffer;
g_object_unref(proxy);
return;
return GST_FLOW_OK;
/* ERRORS */
error_decode:
{
GST_DEBUG("decode error %d", status);
return GST_FLOW_UNEXPECTED;
}
error_create_buffer:
{
const GstVaapiID surface_id =
@ -137,52 +145,16 @@ error_create_buffer:
"surface %" GST_VAAPI_ID_FORMAT " (error %d)",
GST_VAAPI_ID_ARGS(surface_id), ret);
g_object_unref(proxy);
return;
return GST_FLOW_UNEXPECTED;
}
error_commit_buffer:
{
GST_DEBUG("video sink rejected the video buffer (error %d)", ret);
g_object_unref(proxy);
return;
return GST_FLOW_UNEXPECTED;
}
}
static gboolean
gst_vaapidecode_start(GstVaapiDecode *decode)
{
if (gst_task_get_state(decode->decoder_task) == GST_TASK_STARTED)
return TRUE;
GST_DEBUG("start decoding threads");
if (!gst_vaapi_decoder_start(decode->decoder))
return FALSE;
return gst_task_start(decode->decoder_task);
}
static gboolean
gst_vaapidecode_pause(GstVaapiDecode *decode)
{
if (gst_task_get_state(decode->decoder_task) == GST_TASK_PAUSED)
return TRUE;
GST_DEBUG("pause decoding threads");
if (!gst_vaapi_decoder_pause(decode->decoder))
return FALSE;
return gst_task_pause(decode->decoder_task);
}
static gboolean
gst_vaapidecode_stop(GstVaapiDecode *decode)
{
if (gst_task_get_state(decode->decoder_task) == GST_TASK_STOPPED)
return TRUE;
GST_DEBUG("stop decoding threads");
if (!gst_vaapi_decoder_stop(decode->decoder))
return FALSE;
return gst_task_stop(decode->decoder_task);
}
static gboolean
gst_vaapidecode_create(GstVaapiDecode *decode)
{
@ -208,26 +180,12 @@ gst_vaapidecode_create(GstVaapiDecode *decode)
if (decode->use_ffmpeg)
decode->decoder =
gst_vaapi_decoder_ffmpeg_new(display, codec, decode->codec_data);
if (!decode->decoder)
return FALSE;
decode->decoder_task = gst_task_create(gst_vaapidecode_task_cb, decode);
if (!decode->decoder_task)
return FALSE;
gst_task_set_lock(decode->decoder_task, &decode->decoder_task_lock);
return TRUE;
return decode->decoder != NULL;
}
static void
gst_vaapidecode_destroy(GstVaapiDecode *decode)
{
if (decode->decoder_task) {
gst_task_join(decode->decoder_task);
g_object_unref(decode->decoder_task);
decode->decoder_task = NULL;
}
if (decode->decoder) {
gst_vaapi_decoder_put_buffer(decode->decoder, NULL);
g_object_unref(decode->decoder);
@ -322,8 +280,6 @@ gst_vaapidecode_change_state(GstElement *element, GstStateChange transition)
case GST_STATE_CHANGE_READY_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
if (!gst_vaapidecode_start(decode))
return GST_STATE_CHANGE_FAILURE;
break;
default:
break;
@ -335,12 +291,8 @@ gst_vaapidecode_change_state(GstElement *element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
if (!gst_vaapidecode_pause(decode))
return GST_STATE_CHANGE_FAILURE;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (!gst_vaapidecode_stop(decode))
return GST_STATE_CHANGE_FAILURE;
break;
default:
break;
@ -424,14 +376,11 @@ gst_vaapidecode_chain(GstPad *pad, GstBuffer *buf)
goto error_create_decoder;
}
if (!gst_vaapidecode_start(decode))
goto error_start_decoder;
if (!gst_vaapi_decoder_put_buffer(decode->decoder, buf))
goto error_push_buffer;
gst_buffer_unref(buf);
return GST_FLOW_OK;
return gst_vaapidecode_step(decode);
/* ERRORS */
error_create_decoder:
@ -440,12 +389,6 @@ error_create_decoder:
gst_buffer_unref(buf);
return GST_FLOW_UNEXPECTED;
}
error_start_decoder:
{
GST_DEBUG("failed to start decoder");
gst_buffer_unref(buf);
return GST_FLOW_UNEXPECTED;
}
error_push_buffer:
{
GST_DEBUG("failed to push input buffer to decoder");
@ -481,14 +424,11 @@ gst_vaapidecode_init(GstVaapiDecode *decode, GstVaapiDecodeClass *klass)
{
GstElementClass * const element_class = GST_ELEMENT_CLASS(klass);
decode->display = NULL;
decode->profile = 0;
decode->codec_data = NULL;
decode->decoder = NULL;
decode->decoder_task = NULL;
decode->use_ffmpeg = TRUE;
g_static_rec_mutex_init(&decode->decoder_task_lock);
decode->display = NULL;
decode->profile = 0;
decode->codec_data = NULL;
decode->decoder = NULL;
decode->use_ffmpeg = TRUE;
/* Pad through which data comes in to the element */
decode->sinkpad = gst_pad_new_from_template(

View file

@ -65,8 +65,6 @@ struct _GstVaapiDecode {
GstVaapiProfile profile;
GstBuffer *codec_data;
GstVaapiDecoder *decoder;
GstTask *decoder_task;
GStaticRecMutex decoder_task_lock;
unsigned int use_ffmpeg : 1;
};

View file

@ -28,10 +28,6 @@
#include "test-h264.h"
#include "test-vc1.h"
/* Default timeout to wait for the first decoded frame (10 ms) */
/* Defined to -1 if the application indefintely waits for the decoded frame */
#define TIMEOUT -1
typedef void (*GetVideoDataFunc)(const guchar **data, guint *size);
typedef struct _CodecDefs CodecDefs;
@ -123,16 +119,9 @@ main(int argc, char *argv[])
if (!gst_vaapi_decoder_put_buffer(decoder, NULL))
g_error("could not send EOS to the decoder");
if (TIMEOUT < 0) {
proxy = gst_vaapi_decoder_get_surface(decoder, &status);
if (!proxy)
g_error("could not get decoded surface");
}
else {
proxy = gst_vaapi_decoder_timed_get_surface(decoder, TIMEOUT, &status);
if (!proxy)
g_warning("Could not get decoded surface after %d us", TIMEOUT);
}
proxy = gst_vaapi_decoder_get_surface(decoder, &status);
if (!proxy)
g_error("could not get decoded surface (decoder status %d)", status);
gst_vaapi_window_show(window);