decklink: Use GstQueueArray instead of GQueue

Let's save two allocations and frees per frame.
This commit is contained in:
Sebastian Dröge 2017-03-24 14:25:48 +02:00
parent ddcd7735b6
commit d53da45886
4 changed files with 125 additions and 91 deletions

View file

@ -65,12 +65,10 @@ typedef struct
} CapturePacket;
static void
capture_packet_free (void *data)
capture_packet_clear (CapturePacket * packet)
{
CapturePacket *packet = (CapturePacket *) data;
packet->packet->Release ();
g_free (packet);
memset (packet, 0, sizeof (*packet));
}
typedef struct
@ -207,7 +205,9 @@ gst_decklink_audio_src_init (GstDecklinkAudioSrc * self)
g_mutex_init (&self->lock);
g_cond_init (&self->cond);
g_queue_init (&self->current_packets);
self->current_packets =
gst_queue_array_new_for_struct (sizeof (CapturePacket),
DEFAULT_BUFFER_SIZE);
}
void
@ -280,6 +280,15 @@ gst_decklink_audio_src_finalize (GObject * object)
g_mutex_clear (&self->lock);
g_cond_clear (&self->cond);
if (self->current_packets) {
while (gst_queue_array_get_length (self->current_packets) > 0) {
CapturePacket *tmp = (CapturePacket *)
gst_queue_array_pop_head_struct (self->current_packets);
capture_packet_clear (tmp);
}
gst_queue_array_free (self->current_packets);
self->current_packets = NULL;
}
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -508,21 +517,23 @@ gst_decklink_audio_src_got_packet (GstElement * element,
g_mutex_lock (&self->lock);
if (!self->flushing) {
CapturePacket *p;
CapturePacket p;
while (g_queue_get_length (&self->current_packets) >= self->buffer_size) {
p = (CapturePacket *) g_queue_pop_head (&self->current_packets);
while (gst_queue_array_get_length (self->current_packets) >=
self->buffer_size) {
CapturePacket *tmp = (CapturePacket *)
gst_queue_array_pop_head_struct (self->current_packets);
GST_WARNING_OBJECT (self, "Dropping old packet at %" GST_TIME_FORMAT,
GST_TIME_ARGS (p->timestamp));
capture_packet_free (p);
GST_TIME_ARGS (tmp->timestamp));
capture_packet_clear (tmp);
}
p = (CapturePacket *) g_malloc0 (sizeof (CapturePacket));
p->packet = packet;
p->timestamp = timestamp;
p->no_signal = no_signal;
memset (&p, 0, sizeof (p));
p.packet = packet;
p.timestamp = timestamp;
p.no_signal = no_signal;
packet->AddRef ();
g_queue_push_tail (&self->current_packets, p);
gst_queue_array_push_tail_struct (self->current_packets, &p);
g_cond_signal (&self->cond);
}
g_mutex_unlock (&self->lock);
@ -536,7 +547,7 @@ gst_decklink_audio_src_create (GstPushSrc * bsrc, GstBuffer ** buffer)
const guint8 *data;
glong sample_count;
gsize data_size;
CapturePacket *p;
CapturePacket p;
AudioPacket *ap;
GstClockTime timestamp, duration;
GstClockTime start_time, end_time;
@ -545,29 +556,29 @@ gst_decklink_audio_src_create (GstPushSrc * bsrc, GstBuffer ** buffer)
retry:
g_mutex_lock (&self->lock);
while (g_queue_is_empty (&self->current_packets) && !self->flushing) {
while (gst_queue_array_is_empty (self->current_packets) && !self->flushing) {
g_cond_wait (&self->cond, &self->lock);
}
p = (CapturePacket *) g_queue_pop_head (&self->current_packets);
g_mutex_unlock (&self->lock);
if (self->flushing) {
if (p)
capture_packet_free (p);
GST_DEBUG_OBJECT (self, "Flushing");
g_mutex_unlock (&self->lock);
return GST_FLOW_FLUSHING;
}
p->packet->GetBytes ((gpointer *) & data);
sample_count = p->packet->GetSampleFrameCount ();
p = *(CapturePacket *)
gst_queue_array_pop_head_struct (self->current_packets);
g_mutex_unlock (&self->lock);
p.packet->GetBytes ((gpointer *) & data);
sample_count = p.packet->GetSampleFrameCount ();
data_size = self->info.bpf * sample_count;
if (p->timestamp == GST_CLOCK_TIME_NONE && self->next_offset == (guint64) - 1) {
if (p.timestamp == GST_CLOCK_TIME_NONE && self->next_offset == (guint64) - 1) {
GST_DEBUG_OBJECT (self,
"Got packet without timestamp before initial "
"timestamp after discont - dropping");
capture_packet_free (p);
capture_packet_clear (&p);
goto retry;
}
@ -578,12 +589,12 @@ retry:
(gpointer) data, data_size, 0, data_size, ap,
(GDestroyNotify) audio_packet_free);
ap->packet = p->packet;
p->packet->AddRef ();
ap->packet = p.packet;
p.packet->AddRef ();
ap->input = self->input->input;
ap->input->AddRef ();
timestamp = p->timestamp;
timestamp = p.timestamp;
// Jitter and discontinuity handling, based on audiobasesrc
start_time = timestamp;
@ -651,7 +662,7 @@ retry:
self->info.rate) - timestamp;
}
if (p->no_signal)
if (p.no_signal)
GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_GAP);
GST_BUFFER_TIMESTAMP (*buffer) = timestamp;
GST_BUFFER_DURATION (*buffer) = duration;
@ -661,7 +672,7 @@ retry:
GST_TIME_FORMAT, *buffer, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (*buffer)));
capture_packet_free (p);
capture_packet_clear (&p);
return flow_ret;
}
@ -724,8 +735,11 @@ gst_decklink_audio_src_unlock_stop (GstBaseSrc * bsrc)
g_mutex_lock (&self->lock);
self->flushing = FALSE;
g_queue_foreach (&self->current_packets, (GFunc) capture_packet_free, NULL);
g_queue_clear (&self->current_packets);
while (gst_queue_array_get_length (self->current_packets) > 0) {
CapturePacket *tmp = (CapturePacket *)
gst_queue_array_pop_head_struct (self->current_packets);
capture_packet_clear (tmp);
}
g_mutex_unlock (&self->lock);
return TRUE;
@ -795,8 +809,11 @@ gst_decklink_audio_src_stop (GstDecklinkAudioSrc * self)
{
GST_DEBUG_OBJECT (self, "Stopping");
g_queue_foreach (&self->current_packets, (GFunc) capture_packet_free, NULL);
g_queue_clear (&self->current_packets);
while (gst_queue_array_get_length (self->current_packets) > 0) {
CapturePacket *tmp = (CapturePacket *)
gst_queue_array_pop_head_struct (self->current_packets);
capture_packet_clear (tmp);
}
if (self->input && self->input->audio_enabled) {
g_mutex_lock (&self->input->lock);

View file

@ -62,7 +62,7 @@ struct _GstDecklinkAudioSrc
GCond cond;
GMutex lock;
gboolean flushing;
GQueue current_packets;
GstQueueArray *current_packets;
/* properties for handling jittery timestamps */
GstClockTime alignment_threshold;

View file

@ -62,14 +62,12 @@ typedef struct
} CaptureFrame;
static void
capture_frame_free (void *data)
capture_frame_clear (CaptureFrame * frame)
{
CaptureFrame *frame = (CaptureFrame *) data;
frame->frame->Release ();
if (frame->tc)
gst_video_time_code_free (frame->tc);
g_free (frame);
memset (frame, 0, sizeof (*frame));
}
typedef struct
@ -251,7 +249,9 @@ gst_decklink_video_src_init (GstDecklinkVideoSrc * self)
g_mutex_init (&self->lock);
g_cond_init (&self->cond);
g_queue_init (&self->current_frames);
self->current_frames =
gst_queue_array_new_for_struct (sizeof (CaptureFrame),
DEFAULT_BUFFER_SIZE);
}
void
@ -371,6 +371,16 @@ gst_decklink_video_src_finalize (GObject * object)
g_mutex_clear (&self->lock);
g_cond_clear (&self->cond);
if (self->current_frames) {
while (gst_queue_array_get_length (self->current_frames) > 0) {
CaptureFrame *tmp = (CaptureFrame *)
gst_queue_array_pop_head_struct (self->current_frames);
capture_frame_clear (tmp);
}
gst_queue_array_free (self->current_frames);
self->current_frames = NULL;
}
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -656,25 +666,27 @@ gst_decklink_video_src_got_frame (GstElement * element,
GST_TIME_FORMAT ")", GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
if (!self->flushing) {
CaptureFrame *f;
CaptureFrame f;
const GstDecklinkMode *bmode;
GstVideoTimeCodeFlags flags = GST_VIDEO_TIME_CODE_FLAGS_NONE;
guint field_count = 0;
while (g_queue_get_length (&self->current_frames) >= self->buffer_size) {
f = (CaptureFrame *) g_queue_pop_head (&self->current_frames);
while (gst_queue_array_get_length (self->current_frames) >=
self->buffer_size) {
CaptureFrame *tmp = (CaptureFrame *)
gst_queue_array_pop_head_struct (self->current_frames);
GST_WARNING_OBJECT (self, "Dropping old frame at %" GST_TIME_FORMAT,
GST_TIME_ARGS (f->timestamp));
capture_frame_free (f);
GST_TIME_ARGS (tmp->timestamp));
capture_frame_clear (tmp);
}
f = (CaptureFrame *) g_malloc0 (sizeof (CaptureFrame));
f->frame = frame;
f->timestamp = timestamp;
f->duration = duration;
f->mode = mode;
f->format = frame->GetPixelFormat ();
f->no_signal = no_signal;
memset (&f, 0, sizeof (f));
f.frame = frame;
f.timestamp = timestamp;
f.duration = duration;
f.mode = mode;
f.format = frame->GetPixelFormat ();
f.no_signal = no_signal;
if (dtc != NULL) {
uint8_t hours, minutes, seconds, frames;
BMDTimecodeFlags bflags;
@ -684,7 +696,7 @@ gst_decklink_video_src_got_frame (GstElement * element,
if (res != S_OK) {
GST_ERROR ("Could not get components for timecode %p: 0x%08x", dtc,
res);
f->tc = NULL;
f.tc = NULL;
} else {
bflags = dtc->GetFlags ();
GST_DEBUG_OBJECT (self, "Got timecode %02d:%02d:%02d:%02d",
@ -703,17 +715,17 @@ gst_decklink_video_src_got_frame (GstElement * element,
flags =
(GstVideoTimeCodeFlags) (flags |
GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME);
f->tc =
f.tc =
gst_video_time_code_new (bmode->fps_n, bmode->fps_d, NULL, flags,
hours, minutes, seconds, frames, field_count);
}
dtc->Release ();
} else {
f->tc = NULL;
f.tc = NULL;
}
frame->AddRef ();
g_queue_push_tail (&self->current_frames, f);
gst_queue_array_push_tail_struct (self->current_frames, &f);
g_cond_signal (&self->cond);
}
g_mutex_unlock (&self->lock);
@ -727,63 +739,62 @@ gst_decklink_video_src_create (GstPushSrc * bsrc, GstBuffer ** buffer)
const guint8 *data;
gsize data_size;
VideoFrame *vf;
CaptureFrame *f;
CaptureFrame f;
GstCaps *caps;
gboolean caps_changed = FALSE;
const GstDecklinkMode *mode;
g_mutex_lock (&self->lock);
while (g_queue_is_empty (&self->current_frames) && !self->flushing) {
while (gst_queue_array_is_empty (self->current_frames) && !self->flushing) {
g_cond_wait (&self->cond, &self->lock);
}
f = (CaptureFrame *) g_queue_pop_head (&self->current_frames);
g_mutex_unlock (&self->lock);
if (self->flushing) {
if (f)
capture_frame_free (f);
GST_DEBUG_OBJECT (self, "Flushing");
g_mutex_unlock (&self->lock);
return GST_FLOW_FLUSHING;
}
f = *(CaptureFrame *) gst_queue_array_pop_head_struct (self->current_frames);
g_mutex_unlock (&self->lock);
// If we're not flushing, we should have a valid frame from the queue
g_assert (f != NULL);
g_assert (f.frame != NULL);
g_mutex_lock (&self->lock);
if (self->caps_mode != f->mode) {
if (self->caps_mode != f.mode) {
if (self->mode == GST_DECKLINK_MODE_AUTO) {
GST_DEBUG_OBJECT (self, "Mode changed from %d to %d", self->caps_mode,
f->mode);
f.mode);
caps_changed = TRUE;
self->caps_mode = f->mode;
self->caps_mode = f.mode;
} else {
g_mutex_unlock (&self->lock);
GST_ELEMENT_ERROR (self, CORE, NEGOTIATION,
("Invalid mode in captured frame"),
("Mode set to %d but captured %d", self->caps_mode, f->mode));
capture_frame_free (f);
("Mode set to %d but captured %d", self->caps_mode, f.mode));
capture_frame_clear (&f);
return GST_FLOW_NOT_NEGOTIATED;
}
}
if (self->caps_format != f->format) {
if (self->caps_format != f.format) {
if (self->video_format == GST_DECKLINK_VIDEO_FORMAT_AUTO) {
GST_DEBUG_OBJECT (self, "Format changed from %d to %d", self->caps_format,
f->format);
f.format);
caps_changed = TRUE;
self->caps_format = f->format;
self->caps_format = f.format;
} else {
g_mutex_unlock (&self->lock);
GST_ELEMENT_ERROR (self, CORE, NEGOTIATION,
("Invalid pixel format in captured frame"),
("Format set to %d but captured %d", self->caps_format, f->format));
capture_frame_free (f);
("Format set to %d but captured %d", self->caps_format, f.format));
capture_frame_clear (&f);
return GST_FLOW_NOT_NEGOTIATED;
}
}
g_mutex_unlock (&self->lock);
if (caps_changed) {
caps = gst_decklink_mode_get_caps (f->mode, f->format, TRUE);
caps = gst_decklink_mode_get_caps (f.mode, f.format, TRUE);
gst_video_info_from_caps (&self->info, caps);
gst_base_src_set_caps (GST_BASE_SRC_CAST (bsrc), caps);
gst_element_post_message (GST_ELEMENT_CAST (self),
@ -792,7 +803,7 @@ gst_decklink_video_src_create (GstPushSrc * bsrc, GstBuffer ** buffer)
}
f->frame->GetBytes ((gpointer *) & data);
f.frame->GetBytes ((gpointer *) & data);
data_size = self->info.size;
vf = (VideoFrame *) g_malloc0 (sizeof (VideoFrame));
@ -802,12 +813,12 @@ gst_decklink_video_src_create (GstPushSrc * bsrc, GstBuffer ** buffer)
(gpointer) data, data_size, 0, data_size, vf,
(GDestroyNotify) video_frame_free);
vf->frame = f->frame;
f->frame->AddRef ();
vf->frame = f.frame;
f.frame->AddRef ();
vf->input = self->input->input;
vf->input->AddRef ();
if (f->no_signal) {
if (f.no_signal) {
if (!self->no_signal) {
self->no_signal = TRUE;
g_object_notify (G_OBJECT (self), "signal");
@ -823,12 +834,12 @@ gst_decklink_video_src_create (GstPushSrc * bsrc, GstBuffer ** buffer)
}
}
if (f->no_signal)
if (f.no_signal)
GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_GAP);
GST_BUFFER_TIMESTAMP (*buffer) = f->timestamp;
GST_BUFFER_DURATION (*buffer) = f->duration;
if (f->tc != NULL)
gst_buffer_add_video_time_code_meta (*buffer, f->tc);
GST_BUFFER_TIMESTAMP (*buffer) = f.timestamp;
GST_BUFFER_DURATION (*buffer) = f.duration;
if (f.tc != NULL)
gst_buffer_add_video_time_code_meta (*buffer, f.tc);
mode = gst_decklink_get_mode (self->mode);
if (mode->interlaced && mode->tff)
@ -840,7 +851,7 @@ gst_decklink_video_src_create (GstPushSrc * bsrc, GstBuffer ** buffer)
GST_TIME_FORMAT, *buffer, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (*buffer)));
capture_frame_free (f);
capture_frame_clear (&f);
return flow_ret;
}
@ -900,8 +911,11 @@ gst_decklink_video_src_unlock_stop (GstBaseSrc * bsrc)
g_mutex_lock (&self->lock);
self->flushing = FALSE;
g_queue_foreach (&self->current_frames, (GFunc) capture_frame_free, NULL);
g_queue_clear (&self->current_frames);
while (gst_queue_array_get_length (self->current_frames) > 0) {
CaptureFrame *tmp =
(CaptureFrame *) gst_queue_array_pop_head_struct (self->current_frames);
capture_frame_clear (tmp);
}
g_mutex_unlock (&self->lock);
return TRUE;
@ -960,8 +974,11 @@ gst_decklink_video_src_stop (GstDecklinkVideoSrc * self)
{
GST_DEBUG_OBJECT (self, "Stopping");
g_queue_foreach (&self->current_frames, (GFunc) capture_frame_free, NULL);
g_queue_clear (&self->current_frames);
while (gst_queue_array_get_length (self->current_frames) > 0) {
CaptureFrame *tmp =
(CaptureFrame *) gst_queue_array_pop_head_struct (self->current_frames);
capture_frame_clear (tmp);
}
self->caps_mode = GST_DECKLINK_MODE_AUTO;
if (self->input && self->input->video_enabled) {

View file

@ -68,7 +68,7 @@ struct _GstDecklinkVideoSrc
GCond cond;
GMutex lock;
gboolean flushing;
GQueue current_frames;
GstQueueArray *current_frames;
gboolean no_signal;
guint buffer_size;