vtenc: Move output loop to a separate thread

vtenc has an async output queue, which we only iterate over after another frame is enqueued.
At the very least it means we're always a frame behind the fastest possible output.
In edge cases it's also bug-prone - for example if we only have 1 frame, the downstream caps negotiation
will never happen.

This commit adds a separate task running on the source pad, which only iterates over the output queue
and pushes frames out as soon as they're put there. The queue length is limited to ensure we don't encode
too far ahead compared to what downstream can consume. Any failures that occur when pushing data downstream
will be signalled in self->downstream_ret so that other parts of code can act accordingly.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4967>
This commit is contained in:
Piotr Brzeziński 2023-07-04 17:39:58 +02:00 committed by GStreamer Marge Bot
parent bfd4b20fe3
commit dc494d9edb
2 changed files with 331 additions and 94 deletions

View file

@ -80,6 +80,7 @@
#define VTENC_DEFAULT_MAX_KEYFRAME_INTERVAL 0
#define VTENC_DEFAULT_MAX_KEYFRAME_INTERVAL_DURATION 0
#define VTENC_DEFAULT_PRESERVE_ALPHA TRUE
#define VTENC_OUTPUT_QUEUE_SIZE 3
GST_DEBUG_CATEGORY (gst_vtenc_debug);
#define GST_CAT_DEFAULT (gst_vtenc_debug)
@ -150,15 +151,14 @@ static void gst_vtenc_finalize (GObject * obj);
static gboolean gst_vtenc_start (GstVideoEncoder * enc);
static gboolean gst_vtenc_stop (GstVideoEncoder * enc);
static void gst_vtenc_loop (GstVTEnc * self);
static gboolean gst_vtenc_set_format (GstVideoEncoder * enc,
GstVideoCodecState * input_state);
static GstFlowReturn gst_vtenc_handle_frame (GstVideoEncoder * enc,
GstVideoCodecFrame * frame);
static GstFlowReturn gst_vtenc_finish (GstVideoEncoder * enc);
static gboolean gst_vtenc_flush (GstVideoEncoder * enc);
static void gst_vtenc_clear_cached_caps_downstream (GstVTEnc * self);
static gboolean gst_vtenc_sink_event (GstVideoEncoder * enc, GstEvent * event);
static VTCompressionSessionRef gst_vtenc_create_session (GstVTEnc * self);
static void gst_vtenc_destroy_session (GstVTEnc * self,
VTCompressionSessionRef * session);
@ -359,6 +359,7 @@ gst_vtenc_class_init (GstVTEncClass * klass)
gstvideoencoder_class->handle_frame = gst_vtenc_handle_frame;
gstvideoencoder_class->finish = gst_vtenc_finish;
gstvideoencoder_class->flush = gst_vtenc_flush;
gstvideoencoder_class->sink_event = gst_vtenc_sink_event;
g_object_class_install_property (gobject_class, PROP_BITRATE,
g_param_spec_uint ("bitrate", "Bitrate",
@ -439,6 +440,9 @@ gst_vtenc_init (GstVTEnc * self)
CFDictionaryCreate (NULL, (const void **) keyframe_props_keys,
(const void **) keyframe_props_values, G_N_ELEMENTS (keyframe_props_keys),
&kCFTypeDictionaryKeyCallBacks, &kCFTypeDictionaryValueCallBacks);
g_mutex_init (&self->queue_mutex);
g_cond_init (&self->queue_cond);
}
static void
@ -447,6 +451,8 @@ gst_vtenc_finalize (GObject * obj)
GstVTEnc *self = GST_VTENC_CAST (obj);
CFRelease (self->keyframe_props);
g_mutex_clear (&self->queue_mutex);
g_cond_clear (&self->queue_cond);
G_OBJECT_CLASS (parent_class)->finalize (obj);
}
@ -667,15 +673,62 @@ gst_vtenc_set_property (GObject * obj, guint prop_id, const GValue * value,
}
}
static gboolean
gst_vtenc_ensure_output_loop (GstVTEnc * self)
{
GstPad *pad = GST_VIDEO_ENCODER_SRC_PAD (self);
GstTask *task = GST_PAD_TASK (pad);
return gst_task_resume (task);
}
static void
gst_vtenc_pause_output_loop (GstVTEnc * self)
{
g_mutex_lock (&self->queue_mutex);
self->pause_task = TRUE;
g_cond_signal (&self->queue_cond);
g_mutex_unlock (&self->queue_mutex);
gst_pad_pause_task (GST_VIDEO_ENCODER_SRC_PAD (self));
GST_DEBUG_OBJECT (self, "paused output thread");
g_mutex_lock (&self->queue_mutex);
self->pause_task = FALSE;
g_mutex_unlock (&self->queue_mutex);
}
static GstFlowReturn
gst_vtenc_finish_encoding (GstVTEnc * self, gboolean is_flushing)
{
GST_DEBUG_OBJECT (self,
"complete encoding and clean buffer queue, is flushing %d", is_flushing);
GstVideoCodecFrame *outframe;
GstFlowReturn ret = GST_FLOW_OK;
OSStatus vt_status;
/* In case of EOS before the first buffer/caps */
if (self->session == NULL)
return GST_FLOW_OK;
/* If output loop failed to push things downstream */
if (self->downstream_ret != GST_FLOW_OK
&& self->downstream_ret != GST_FLOW_FLUSHING) {
GST_WARNING_OBJECT (self, "Output loop stopped with error (%s), leaving",
gst_flow_get_name (self->downstream_ret));
return self->downstream_ret;
}
if (is_flushing) {
g_mutex_lock (&self->queue_mutex);
self->is_flushing = TRUE;
g_cond_signal (&self->queue_cond);
g_mutex_unlock (&self->queue_mutex);
}
if (!gst_vtenc_ensure_output_loop (self)) {
GST_ERROR_OBJECT (self, "Output loop failed to resume");
return GST_FLOW_ERROR;
}
/* We need to unlock the stream lock here because
* it can wait for gst_vtenc_enqueue_buffer() to
* handle a buffer... which will take the stream
@ -686,29 +739,22 @@ gst_vtenc_finish_encoding (GstVTEnc * self, gboolean is_flushing)
VTCompressionSessionCompleteFrames (self->session,
kCMTimePositiveInfinity);
GST_DEBUG_OBJECT (self, "VTCompressionSessionCompleteFrames ended");
GST_VIDEO_ENCODER_STREAM_LOCK (self);
if (vt_status != noErr) {
GST_WARNING_OBJECT (self, "VTCompressionSessionCompleteFrames returned %d",
(int) vt_status);
}
while ((outframe = g_async_queue_try_pop (self->cur_outframes))) {
if (is_flushing) {
GST_DEBUG_OBJECT (self, "flushing frame number %d",
outframe->system_frame_number);
gst_video_codec_frame_unref (outframe);
} else {
GST_DEBUG_OBJECT (self, "finish frame number %d",
outframe->system_frame_number);
ret =
gst_video_encoder_finish_frame (GST_VIDEO_ENCODER_CAST (self),
outframe);
}
}
gst_vtenc_pause_output_loop (self);
GST_VIDEO_ENCODER_STREAM_LOCK (self);
GST_DEBUG_OBJECT (self, "buffer queue cleaned");
if (self->downstream_ret == GST_FLOW_OK)
GST_DEBUG_OBJECT (self, "buffer queue cleaned");
else
GST_DEBUG_OBJECT (self,
"buffer queue not cleaned, output thread returned %s",
gst_flow_get_name (self->downstream_ret));
return ret;
return self->downstream_ret;
}
static gboolean
@ -719,7 +765,24 @@ gst_vtenc_start (GstVideoEncoder * enc)
/* DTS can be negative if b-frames are enabled */
gst_video_encoder_set_min_pts (enc, GST_SECOND * 60 * 60 * 1000);
self->cur_outframes = g_async_queue_new ();
self->is_flushing = FALSE;
self->downstream_ret = GST_FLOW_OK;
self->output_queue = gst_queue_array_new (0);
/* Set clear_func to unref all remaining frames in gst_queue_array_free() */
gst_queue_array_set_clear_func (self->output_queue,
(GDestroyNotify) gst_video_codec_frame_unref);
/* Create the output task, but pause it immediately */
self->pause_task = TRUE;
if (!gst_pad_start_task (GST_VIDEO_DECODER_SRC_PAD (enc),
(GstTaskFunction) gst_vtenc_loop, self, NULL)) {
GST_ERROR_OBJECT (self, "failed to start output thread");
return FALSE;
}
/* This blocks until the loop actually pauses */
gst_pad_pause_task (GST_VIDEO_ENCODER_SRC_PAD (enc));
self->pause_task = FALSE;
return TRUE;
}
@ -731,12 +794,18 @@ gst_vtenc_stop (GstVideoEncoder * enc)
GST_VIDEO_ENCODER_STREAM_LOCK (self);
gst_vtenc_flush (enc);
self->downstream_ret = GST_FLOW_FLUSHING;
GST_VIDEO_ENCODER_STREAM_UNLOCK (self);
gst_pad_stop_task (GST_VIDEO_ENCODER_SRC_PAD (enc));
GST_OBJECT_LOCK (self);
gst_vtenc_destroy_session (self, &self->session);
GST_OBJECT_UNLOCK (self);
self->negotiate_downstream = TRUE;
self->is_flushing = TRUE;
if (self->profile_level)
CFRelease (self->profile_level);
self->profile_level = NULL;
@ -748,10 +817,8 @@ gst_vtenc_stop (GstVideoEncoder * enc)
self->video_info.width = self->video_info.height = 0;
self->video_info.fps_n = self->video_info.fps_d = 0;
gst_vtenc_clear_cached_caps_downstream (self);
g_async_queue_unref (self->cur_outframes);
self->cur_outframes = NULL;
gst_queue_array_free (self->output_queue);
self->output_queue = NULL;
return TRUE;
}
@ -925,17 +992,22 @@ gst_vtenc_set_format (GstVideoEncoder * enc, GstVideoCodecState * state)
GstVTEnc *self = GST_VTENC_CAST (enc);
VTCompressionSessionRef session;
if (self->input_state)
if (self->input_state) {
gst_vtenc_finish_encoding (self, FALSE);
gst_video_codec_state_unref (self->input_state);
self->input_state = gst_video_codec_state_ref (state);
self->video_info = state->info;
}
GST_OBJECT_LOCK (self);
gst_vtenc_destroy_session (self, &self->session);
GST_OBJECT_UNLOCK (self);
gst_vtenc_negotiate_specific_format_details (enc);
self->input_state = gst_video_codec_state_ref (state);
self->video_info = state->info;
if (!gst_vtenc_negotiate_specific_format_details (enc))
return FALSE;
self->negotiate_downstream = TRUE;
session = gst_vtenc_create_session (self);
GST_OBJECT_LOCK (self);
@ -948,7 +1020,7 @@ gst_vtenc_set_format (GstVideoEncoder * enc, GstVideoCodecState * state)
static gboolean
gst_vtenc_is_negotiated (GstVTEnc * self)
{
return self->video_info.width != 0;
return self->session && self->video_info.width != 0;
}
/*
@ -981,13 +1053,6 @@ gst_vtenc_negotiate_downstream (GstVTEnc * self, CMSampleBufferRef sbuf)
GstStructure *s;
GstVideoCodecState *state;
if (self->caps_width == self->video_info.width &&
self->caps_height == self->video_info.height &&
self->caps_fps_n == self->video_info.fps_n &&
self->caps_fps_d == self->video_info.fps_d) {
return TRUE;
}
caps = gst_pad_get_pad_template_caps (GST_VIDEO_ENCODER_SRC_PAD (self));
caps = gst_caps_make_writable (caps);
s = gst_caps_get_structure (caps, 0);
@ -1062,21 +1127,9 @@ gst_vtenc_negotiate_downstream (GstVTEnc * self, CMSampleBufferRef sbuf)
gst_video_codec_state_unref (state);
result = gst_video_encoder_negotiate (GST_VIDEO_ENCODER_CAST (self));
self->caps_width = self->video_info.width;
self->caps_height = self->video_info.height;
self->caps_fps_n = self->video_info.fps_n;
self->caps_fps_d = self->video_info.fps_d;
return result;
}
static void
gst_vtenc_clear_cached_caps_downstream (GstVTEnc * self)
{
self->caps_width = self->caps_height = 0;
self->caps_fps_n = self->caps_fps_d = 0;
}
static GstFlowReturn
gst_vtenc_handle_frame (GstVideoEncoder * enc, GstVideoCodecFrame * frame)
{
@ -1092,6 +1145,45 @@ not_negotiated:
return GST_FLOW_NOT_NEGOTIATED;
}
static gboolean
gst_vtenc_sink_event (GstVideoEncoder * enc, GstEvent * event)
{
GstVTEnc *self = GST_VTENC_CAST (enc);
GstEventType type = GST_EVENT_TYPE (event);
gboolean ret;
switch (type) {
case GST_EVENT_FLUSH_START:
GST_DEBUG_OBJECT (self, "flush start received, setting flushing flag");
g_mutex_lock (&self->queue_mutex);
self->is_flushing = TRUE;
g_cond_signal (&self->queue_cond);
g_mutex_unlock (&self->queue_mutex);
break;
default:
break;
}
ret = GST_VIDEO_ENCODER_CLASS (parent_class)->sink_event (enc, event);
switch (type) {
case GST_EVENT_FLUSH_STOP:
/* The base class handles this event and calls _flush().
* We can then safely reset the flushing flag. */
GST_DEBUG_OBJECT (self, "flush stop received, removing flushing flag");
g_mutex_lock (&self->queue_mutex);
self->is_flushing = FALSE;
g_mutex_unlock (&self->queue_mutex);
break;
default:
break;
}
return ret;
}
static GstFlowReturn
gst_vtenc_finish (GstVideoEncoder * enc)
{
@ -1219,7 +1311,6 @@ gst_vtenc_set_colorimetry (GstVTEnc * self, VTCompressionSessionRef session)
}
}
static gboolean
gst_vtenc_compute_dts_offset (GstVTEnc * self, gint fps_n, gint fps_d)
{
@ -1642,11 +1733,48 @@ gst_vtenc_encode_frame (GstVTEnc * self, GstVideoCodecFrame * frame)
CMTime ts, duration;
GstCoreMediaMeta *meta;
CVPixelBufferRef pbuf = NULL;
GstVideoCodecFrame *outframe;
OSStatus vt_status;
GstFlowReturn ret = GST_FLOW_OK;
gboolean renegotiated;
CFDictionaryRef frame_props = NULL;
GstTaskState task_state;
gboolean is_flushing;
/* If this condition changes later while we're still in this function,
* it'll just fail on next frame encode or in _finish() */
task_state = gst_pad_get_task_state (GST_VIDEO_DECODER_SRC_PAD (self));
if (task_state == GST_TASK_STOPPED || task_state == GST_TASK_PAUSED) {
/* Abort if our loop failed to push frames downstream... */
if (self->downstream_ret != GST_FLOW_OK) {
if (self->downstream_ret == GST_FLOW_FLUSHING)
GST_DEBUG_OBJECT (self,
"Output loop stopped because of flushing, ignoring frame");
else
GST_WARNING_OBJECT (self,
"Output loop stopped with error (%s), leaving",
gst_flow_get_name (self->downstream_ret));
ret = self->downstream_ret;
goto drop;
}
/* ...or if it stopped because of the flushing flag while the queue
* was empty, in which case we didn't get GST_FLOW_FLUSHING... */
g_mutex_lock (&self->queue_mutex);
is_flushing = self->is_flushing;
g_mutex_unlock (&self->queue_mutex);
if (is_flushing) {
GST_DEBUG_OBJECT (self, "Flushing flag set, ignoring frame");
ret = GST_FLOW_FLUSHING;
goto drop;
}
/* .. or if it refuses to resume - e.g. it was stopped instead of paused */
if (!gst_vtenc_ensure_output_loop (self)) {
GST_ERROR_OBJECT (self, "Output loop failed to resume");
ret = GST_FLOW_ERROR;
goto drop;
}
}
if (GST_VIDEO_CODEC_FRAME_IS_FORCE_KEYFRAME (frame)) {
GST_INFO_OBJECT (self, "received force-keyframe-event, will force intra");
@ -1812,38 +1940,16 @@ gst_vtenc_encode_frame (GstVTEnc * self, GstVideoCodecFrame * frame)
}
gst_video_codec_frame_unref (frame);
CVPixelBufferRelease (pbuf);
renegotiated = FALSE;
while ((outframe = g_async_queue_try_pop (self->cur_outframes))) {
if (outframe->output_buffer) {
if (!renegotiated) {
meta = gst_buffer_get_core_media_meta (outframe->output_buffer);
/* Try to renegotiate once */
if (meta) {
if (gst_vtenc_negotiate_downstream (self, meta->sample_buf)) {
renegotiated = TRUE;
} else {
ret = GST_FLOW_NOT_NEGOTIATED;
gst_video_codec_frame_unref (outframe);
/* the rest of the frames will be pop'd and unref'd later */
break;
}
}
}
gst_vtenc_update_latency (self);
}
/* releases frame, even if it has no output buffer (i.e. failed to encode) */
ret =
gst_video_encoder_finish_frame (GST_VIDEO_ENCODER_CAST (self),
outframe);
}
return ret;
drop:
{
gst_video_codec_frame_unref (frame);
return ret;
}
cv_error:
{
gst_video_codec_frame_unref (frame);
@ -1858,8 +1964,8 @@ gst_vtenc_enqueue_buffer (void *outputCallbackRefCon,
VTEncodeInfoFlags infoFlags, CMSampleBufferRef sampleBuffer)
{
GstVTEnc *self = outputCallbackRefCon;
gboolean is_keyframe;
GstVideoCodecFrame *frame;
gboolean is_flushing;
frame =
gst_video_encoder_get_frame (GST_VIDEO_ENCODER_CAST (self),
@ -1882,16 +1988,21 @@ gst_vtenc_enqueue_buffer (void *outputCallbackRefCon,
goto beach;
}
g_mutex_lock (&self->queue_mutex);
is_flushing = self->is_flushing;
g_mutex_unlock (&self->queue_mutex);
if (is_flushing) {
GST_DEBUG_OBJECT (self, "Ignoring frame %d because we're flushing",
frame->system_frame_number);
goto beach;
}
/* This may happen if we don't have enough bitrate */
if (sampleBuffer == NULL)
goto beach;
is_keyframe = gst_vtenc_buffer_is_keyframe (self, sampleBuffer);
if (is_keyframe) {
if (gst_vtenc_buffer_is_keyframe (self, sampleBuffer))
GST_VIDEO_CODEC_FRAME_SET_SYNC_POINT (frame);
gst_vtenc_clear_cached_caps_downstream (self);
}
/* We are dealing with block buffers here, so we don't need
* to enable the use of the video meta API on the core media buffer */
@ -1899,10 +2010,128 @@ gst_vtenc_enqueue_buffer (void *outputCallbackRefCon,
gst_vtenc_update_timestamps (self, frame, sampleBuffer);
/* Limit the amount of frames in our output queue
* to avoid processing too many frames ahead */
g_mutex_lock (&self->queue_mutex);
while (gst_queue_array_get_length (self->output_queue) >
VTENC_OUTPUT_QUEUE_SIZE) {
g_cond_wait (&self->queue_cond, &self->queue_mutex);
}
g_mutex_unlock (&self->queue_mutex);
beach:
/* needed anyway so the frame will be released */
if (frame)
g_async_queue_push (self->cur_outframes, frame);
if (!frame)
return;
g_mutex_lock (&self->queue_mutex);
if (self->is_flushing) {
/* We can discard the frame here, no need to have the output loop do that */
gst_video_codec_frame_unref (frame);
g_mutex_unlock (&self->queue_mutex);
return;
}
/* Buffer-less frames will be discarded in the output loop */
gst_queue_array_push_tail (self->output_queue, frame);
g_cond_signal (&self->queue_cond);
g_mutex_unlock (&self->queue_mutex);
}
static void
gst_vtenc_loop (GstVTEnc * self)
{
GstVideoCodecFrame *outframe;
GstCoreMediaMeta *meta;
GstFlowReturn ret = GST_FLOW_OK;
gboolean should_pause;
g_mutex_lock (&self->queue_mutex);
while (gst_queue_array_is_empty (self->output_queue) && !self->pause_task
&& !self->is_flushing) {
g_cond_wait (&self->queue_cond, &self->queue_mutex);
}
if (self->pause_task) {
g_mutex_unlock (&self->queue_mutex);
gst_pad_pause_task (GST_VIDEO_ENCODER_CAST (self)->srcpad);
return;
}
while ((outframe = gst_queue_array_pop_head (self->output_queue))) {
g_cond_signal (&self->queue_cond);
g_mutex_unlock (&self->queue_mutex);
/* Keep the stream lock -> queue lock order */
GST_VIDEO_ENCODER_STREAM_LOCK (self);
g_mutex_lock (&self->queue_mutex);
if (self->is_flushing) {
GST_LOG_OBJECT (self, "flushing frame %d", outframe->system_frame_number);
gst_video_codec_frame_unref (outframe);
GST_VIDEO_ENCODER_STREAM_UNLOCK (self);
continue;
}
g_mutex_unlock (&self->queue_mutex);
if (self->negotiate_downstream &&
(meta = gst_buffer_get_core_media_meta (outframe->output_buffer))) {
if (!gst_vtenc_negotiate_downstream (self, meta->sample_buf)) {
ret = GST_FLOW_NOT_NEGOTIATED;
gst_video_codec_frame_unref (outframe);
g_mutex_lock (&self->queue_mutex);
/* the rest of the frames will be pop'd and unref'd later */
break;
}
self->negotiate_downstream = FALSE;
}
gst_vtenc_update_latency (self);
GST_LOG_OBJECT (self, "finishing frame %d", outframe->system_frame_number);
GST_VIDEO_ENCODER_STREAM_UNLOCK (self);
/* releases frame, even if it has no output buffer (i.e. failed to encode) */
ret =
gst_video_encoder_finish_frame (GST_VIDEO_ENCODER_CAST (self),
outframe);
g_mutex_lock (&self->queue_mutex);
if (ret != GST_FLOW_OK)
break;
}
g_mutex_unlock (&self->queue_mutex);
GST_VIDEO_ENCODER_STREAM_LOCK (self);
self->downstream_ret = ret;
/* We need to empty the queue immediately so that enqueue_buffer()
* can push out the current buffer, otherwise it can block other
* encoder callbacks completely */
if (ret == GST_FLOW_FLUSHING) {
g_mutex_lock (&self->queue_mutex);
while ((outframe = gst_queue_array_pop_head (self->output_queue))) {
GST_LOG_OBJECT (self, "flushing frame %d", outframe->system_frame_number);
gst_video_codec_frame_unref (outframe);
}
g_cond_signal (&self->queue_cond);
g_mutex_unlock (&self->queue_mutex);
}
GST_VIDEO_ENCODER_STREAM_UNLOCK (self);
/* Check is_flushing here in case we had an empty queue.
* In that scenario we also want to pause, as the encoder callback
* will discard any frames that are output while flushing */
g_mutex_lock (&self->queue_mutex);
should_pause = ret != GST_FLOW_OK || self->is_flushing;
g_mutex_unlock (&self->queue_mutex);
if (should_pause) {
GST_DEBUG_OBJECT (self, "pausing output task: %s",
ret != GST_FLOW_OK ? gst_flow_get_name (ret) : "flushing");
gst_pad_pause_task (GST_VIDEO_ENCODER_CAST (self)->srcpad);
}
}
static gboolean

View file

@ -21,6 +21,7 @@
#define __GST_VTENC_H__
#include <gst/gst.h>
#include <gst/base/gstqueuearray.h>
#include <gst/codecparsers/gsth264parser.h>
#include <gst/video/video.h>
#include <VideoToolbox/VideoToolbox.h>
@ -74,8 +75,6 @@ struct _GstVTEnc
gboolean dump_properties;
gboolean dump_attributes;
gint caps_width, caps_height;
gint caps_fps_n, caps_fps_d;
gboolean have_field_order;
GstVideoCodecState *input_state;
GstVideoInfo video_info;
@ -83,7 +82,16 @@ struct _GstVTEnc
CFDictionaryRef keyframe_props;
GstClockTime dts_offset;
GAsyncQueue * cur_outframes;
GstQueueArray * output_queue;
/* Protects output_queue, is_flushing and pause_task */
GMutex queue_mutex;
GCond queue_cond;
/* downstream_ret is protected by the STREAM_LOCK */
GstFlowReturn downstream_ret;
gboolean negotiate_downstream;
gboolean is_flushing;
gboolean pause_task;
};
void gst_vtenc_register_elements (GstPlugin * plugin);