decklink2src: Use GstQueueArray instead of std::queue

Avoid heap allocation in capture thread
This commit is contained in:
Seungha Yang 2023-06-15 20:13:15 +09:00
parent 8696f33e38
commit ce71612e83
3 changed files with 56 additions and 33 deletions

View file

@ -29,6 +29,7 @@
#include <condition_variable>
#include <queue>
#include <string.h>
#include <gst/base/gstqueuearray.h>
GST_DEBUG_CATEGORY_STATIC (gst_decklink2_input_debug);
#define GST_CAT_DEFAULT gst_decklink2_input_debug
@ -317,6 +318,12 @@ private:
GstDeckLink2Input *input_;
};
struct GstDecklink2InputData
{
GstBuffer *buffer;
GstCaps *caps;
};
struct TimeMapping
{
GstClockTime xbase;
@ -334,7 +341,6 @@ struct GstDeckLink2InputPrivate
std::mutex lock;
std::condition_variable cond;
std::queue < GstSample * >queue;
std::atomic < bool >signal;
};
@ -373,6 +379,8 @@ struct _GstDeckLink2Input
guint64 audio_offset;
GstAdapter *audio_buf;
GstQueueArray *queue;
GstDeckLink2DisplayMode selected_mode;
BMDPixelFormat pixel_format;
GstElement *client;
@ -414,6 +422,13 @@ static void gst_decklink2_input_finalize (GObject * object);
static HRESULT gst_decklink2_input_set_allocator (GstDeckLink2Input * input,
IDeckLinkMemoryAllocator * allocator);
static void
gst_decklink2_input_data_clear (GstDecklink2InputData * data)
{
gst_clear_buffer (&data->buffer);
gst_clear_caps (&data->caps);
}
#define gst_decklink2_input_parent_class parent_class
G_DEFINE_TYPE (GstDeckLink2Input, gst_decklink2_input, GST_TYPE_OBJECT);
@ -436,6 +451,10 @@ gst_decklink2_input_init (GstDeckLink2Input * self)
FALSE, sizeof (GstDeckLink2DisplayMode));
self->audio_buf = gst_adapter_new ();
self->allocator = new IGstDeckLinkMemoryAllocator ();
self->queue = gst_queue_array_new_for_struct (sizeof (GstDecklink2InputData),
6);
gst_queue_array_set_clear_func (self->queue,
(GDestroyNotify) gst_decklink2_input_data_clear);
self->priv = new GstDeckLink2InputPrivate ();
}
@ -449,6 +468,7 @@ gst_decklink2_input_dispose (GObject * object)
gst_clear_caps (&self->selected_audio_caps);
g_clear_pointer (&self->vbi_parser, gst_video_vbi_parser_free);
g_clear_object (&self->audio_buf);
g_clear_pointer (&self->queue, gst_queue_array_free);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
@ -1787,15 +1807,10 @@ out:
if (buffer) {
GstSample *sample;
gsize audio_size;
while (priv->queue.size () > self->buffer_size) {
GstBuffer *old_buf;
sample = priv->queue.front ();
old_buf = gst_sample_get_buffer (sample);
GST_DEBUG_OBJECT (self, "Dropping old buffer %" GST_PTR_FORMAT, old_buf);
gst_sample_unref (sample);
priv->queue.pop ();
while (gst_queue_array_get_length (self->queue) > self->buffer_size) {
GstDecklink2InputData *data = (GstDecklink2InputData *)
gst_queue_array_pop_head_struct (self->queue);
gst_decklink2_input_data_clear (data);
}
audio_size = gst_adapter_available (self->audio_buf);
@ -1851,11 +1866,12 @@ out:
gst_buffer_unref (audio_buf);
}
sample = gst_sample_new (buffer, self->selected_video_caps, NULL, NULL);
GST_LOG_OBJECT (self, "Enqueue buffer %" GST_PTR_FORMAT, buffer);
gst_buffer_unref (buffer);
priv->queue.push (sample);
GstDecklink2InputData new_data;
new_data.buffer = buffer;
new_data.caps = gst_caps_ref (self->selected_video_caps);
gst_queue_array_push_tail_struct (self->queue, &new_data);
priv->cond.notify_all ();
}
}
@ -1869,11 +1885,7 @@ gst_decklink2_input_stop_unlocked (GstDeckLink2Input * self)
gst_decklink2_input_disable_video (self);
gst_decklink2_input_disable_audio (self);
gst_decklink2_input_set_callback (self, NULL);
while (!priv->queue.empty ()) {
GstSample *sample = priv->queue.front ();
gst_sample_unref (sample);
priv->queue.pop ();
}
gst_queue_array_clear (self->queue);
gst_clear_caps (&self->selected_video_caps);
gst_clear_caps (&self->selected_audio_caps);
priv->signal = false;
@ -2084,13 +2096,20 @@ gst_decklink2_input_set_flushing (GstDeckLink2Input * input, gboolean flush)
}
GstFlowReturn
gst_decklink2_input_get_sample (GstDeckLink2Input * input, GstSample ** sample)
gst_decklink2_input_get_data (GstDeckLink2Input * input, GstBuffer ** buf,
GstCaps ** caps)
{
GstDeckLink2InputPrivate *priv = input->priv;
std::unique_lock < std::mutex > lk (priv->lock);
GstDecklink2InputData *data;
while (priv->queue.empty () && !input->flushing && input->started)
*buf = nullptr;
*caps = nullptr;
while (gst_queue_array_is_empty (input->queue)
&& !input->flushing && input->started) {
priv->cond.wait (lk);
}
if (input->flushing)
return GST_FLOW_FLUSHING;
@ -2098,8 +2117,10 @@ gst_decklink2_input_get_sample (GstDeckLink2Input * input, GstSample ** sample)
if (!input->started)
return GST_DECKLINK2_INPUT_FLOW_STOPPED;
*sample = priv->queue.front ();
priv->queue.pop ();
data = (GstDecklink2InputData *)
gst_queue_array_pop_head_struct (input->queue);
*buf = data->buffer;
*caps = data->caps;
return GST_FLOW_OK;
}

View file

@ -72,8 +72,9 @@ void gst_decklink2_input_stop (GstDeckLink2Input * input);
void gst_decklink2_input_set_flushing (GstDeckLink2Input * input,
gboolean flush);
GstFlowReturn gst_decklink2_input_get_sample (GstDeckLink2Input * input,
GstSample ** sample);
GstFlowReturn gst_decklink2_input_get_data (GstDeckLink2Input * input,
GstBuffer ** buffer,
GstCaps ** caps);
gboolean gst_decklink2_input_has_signal (GstDeckLink2Input * input);

View file

@ -568,8 +568,8 @@ static GstFlowReturn
gst_decklink2_src_create (GstPushSrc * src, GstBuffer ** buffer)
{
GstDeckLink2Src *self = GST_DECKLINK2_SRC (src);
GstSample *sample;
GstCaps *caps;
GstBuffer *buf = nullptr;
GstCaps *caps = nullptr;
GstFlowReturn ret;
GstDeckLink2SrcPrivate *priv = self->priv;
gboolean is_gap_buf = FALSE;
@ -581,7 +581,7 @@ again:
return GST_FLOW_ERROR;
}
ret = gst_decklink2_input_get_sample (self->input, &sample);
ret = gst_decklink2_input_get_data (self->input, &buf, &caps);
if (ret != GST_FLOW_OK) {
if (ret == GST_DECKLINK2_INPUT_FLOW_STOPPED) {
GST_DEBUG_OBJECT (self, "Input was stopped for restarting");
@ -592,20 +592,22 @@ again:
}
std::unique_lock < std::mutex > lk (priv->lock);
caps = gst_sample_get_caps (sample);
if (caps && !gst_caps_is_equal (caps, self->selected_caps)) {
GST_DEBUG_OBJECT (self, "Set updated caps %" GST_PTR_FORMAT, caps);
gst_caps_replace (&self->selected_caps, caps);
lk.unlock ();
if (!gst_pad_set_caps (GST_BASE_SRC_PAD (self), caps)) {
GST_ERROR_OBJECT (self, "Couldn't set caps");
gst_sample_unref (sample);
gst_clear_buffer (&buf);
gst_clear_caps (&caps);
return GST_FLOW_NOT_NEGOTIATED;
}
}
*buffer = gst_sample_get_buffer (sample);
if (GST_BUFFER_FLAG_IS_SET (*buffer, GST_BUFFER_FLAG_GAP))
gst_clear_caps (&caps);
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_GAP))
is_gap_buf = TRUE;
if (is_gap_buf != self->is_gap_buf) {
@ -613,8 +615,7 @@ again:
g_object_notify (G_OBJECT (self), "signal");
}
gst_buffer_ref (*buffer);
gst_sample_unref (sample);
*buffer = buf;
return GST_FLOW_OK;
}