Post messages never from the capture thread but instead handle everything from the streaming thread

This commit is contained in:
Sebastian Dröge 2023-06-14 10:54:35 +03:00
parent 2d13fb16e8
commit 002437f37d
2 changed files with 293 additions and 115 deletions

View file

@ -72,24 +72,79 @@ enum {
PROP_SIGNAL, PROP_SIGNAL,
}; };
// Make these plain C structs for usage in GstQueueArray
G_BEGIN_DECLS
typedef enum { typedef enum {
QUEUE_ITEM_TYPE_DUMMY,
QUEUE_ITEM_TYPE_FRAME, QUEUE_ITEM_TYPE_FRAME,
QUEUE_ITEM_TYPE_SIGNAL_CHANGE,
QUEUE_ITEM_TYPE_ERROR,
QUEUE_ITEM_TYPE_FRAMES_DROPPED,
} QueueItemType; } QueueItemType;
typedef struct { typedef struct {
QueueItemType type; QueueItemType type;
// For FRAME union {
GstClockTime capture_time; // For DUMMY
GstBuffer *video_buffer; struct {
GstBuffer *audio_buffer; gchar dummy;
GstBuffer *anc_buffer, *anc_buffer2; } dummy;
NTV2_RP188 tc; // For FRAME
struct {
GstClockTime capture_time;
GstBuffer *video_buffer;
GstBuffer *audio_buffer;
GstBuffer *anc_buffer, *anc_buffer2;
NTV2_RP188 tc;
NTV2VideoFormat detected_format; NTV2VideoFormat detected_format;
guint32 vpid; guint32 vpid;
} frame;
// For SIGNAL_CHANGE
struct {
gboolean have_signal;
NTV2VideoFormat detected_format;
guint32 vpid;
} signal_change;
// For ERROR
struct {
GstMessage *msg;
} error;
// For FRAMES_DROPPED
struct {
gboolean driver_side;
GstClockTime timestamp_start, timestamp_end;
} frames_dropped;
};
} QueueItem; } QueueItem;
G_END_DECLS
static void queue_item_clear(QueueItem *item) {
switch (item->type) {
case QUEUE_ITEM_TYPE_DUMMY:
break;
case QUEUE_ITEM_TYPE_FRAME:
gst_clear_buffer(&item->frame.video_buffer);
gst_clear_buffer(&item->frame.audio_buffer);
gst_clear_buffer(&item->frame.anc_buffer);
gst_clear_buffer(&item->frame.anc_buffer2);
item->frame.tc.~NTV2_RP188();
break;
case QUEUE_ITEM_TYPE_SIGNAL_CHANGE:
break;
case QUEUE_ITEM_TYPE_ERROR:
gst_clear_message(&item->error.msg);
break;
case QUEUE_ITEM_TYPE_FRAMES_DROPPED:
break;
}
item->type = QUEUE_ITEM_TYPE_DUMMY;
}
static void gst_aja_src_set_property(GObject *object, guint property_id, static void gst_aja_src_set_property(GObject *object, guint property_id,
const GValue *value, GParamSpec *pspec); const GValue *value, GParamSpec *pspec);
static void gst_aja_src_get_property(GObject *object, guint property_id, static void gst_aja_src_get_property(GObject *object, guint property_id,
@ -1448,13 +1503,9 @@ static gboolean gst_aja_src_stop(GstAjaSrc *self) {
GST_OBJECT_UNLOCK(self); GST_OBJECT_UNLOCK(self);
while ((item = (QueueItem *)gst_queue_array_pop_head_struct(self->queue))) { while ((item = (QueueItem *)gst_queue_array_pop_head_struct(self->queue))) {
if (item->type == QUEUE_ITEM_TYPE_FRAME) { queue_item_clear(item);
gst_clear_buffer(&item->video_buffer);
gst_clear_buffer(&item->audio_buffer);
gst_clear_buffer(&item->anc_buffer);
gst_clear_buffer(&item->anc_buffer2);
}
} }
self->queue_num_frames = 0;
if (self->buffer_pool) { if (self->buffer_pool) {
gst_buffer_pool_set_active(self->buffer_pool, FALSE); gst_buffer_pool_set_active(self->buffer_pool, FALSE);
@ -1629,7 +1680,12 @@ static gboolean gst_aja_src_unlock_stop(GstBaseSrc *bsrc) {
static GstFlowReturn gst_aja_src_create(GstPushSrc *psrc, GstBuffer **buffer) { static GstFlowReturn gst_aja_src_create(GstPushSrc *psrc, GstBuffer **buffer) {
GstAjaSrc *self = GST_AJA_SRC(psrc); GstAjaSrc *self = GST_AJA_SRC(psrc);
GstFlowReturn flow_ret = GST_FLOW_OK; GstFlowReturn flow_ret = GST_FLOW_OK;
QueueItem item; QueueItem item = {
.type = QUEUE_ITEM_TYPE_DUMMY,
};
next_item:
item.type = QUEUE_ITEM_TYPE_DUMMY;
g_mutex_lock(&self->queue_lock); g_mutex_lock(&self->queue_lock);
while (gst_queue_array_is_empty(self->queue) && !self->flushing) { while (gst_queue_array_is_empty(self->queue) && !self->flushing) {
@ -1643,13 +1699,80 @@ static GstFlowReturn gst_aja_src_create(GstPushSrc *psrc, GstBuffer **buffer) {
} }
item = *(QueueItem *)gst_queue_array_pop_head_struct(self->queue); item = *(QueueItem *)gst_queue_array_pop_head_struct(self->queue);
if (item.type == QUEUE_ITEM_TYPE_FRAME) {
self->queue_num_frames -= 1;
}
g_mutex_unlock(&self->queue_lock); g_mutex_unlock(&self->queue_lock);
*buffer = item.video_buffer; switch (item.type) {
gst_buffer_add_aja_audio_meta(*buffer, item.audio_buffer); case QUEUE_ITEM_TYPE_DUMMY:
gst_buffer_unref(item.audio_buffer); queue_item_clear(&item);
goto next_item;
case QUEUE_ITEM_TYPE_SIGNAL_CHANGE:
// These are already only produced when signal status is changing
if (item.signal_change.have_signal) {
GST_ELEMENT_INFO(GST_ELEMENT(self), RESOURCE, READ,
("Signal recovered"), ("Input source detected"));
self->signal = TRUE;
g_object_notify(G_OBJECT(self), "signal");
} else if (!item.signal_change.have_signal) {
if (item.signal_change.detected_format != ::NTV2_FORMAT_UNKNOWN) {
std::string format_string =
NTV2VideoFormatToString(item.signal_change.detected_format);
if (item.tc.IsValid()) { GST_ELEMENT_WARNING_WITH_DETAILS(
GST_ELEMENT(self), RESOURCE, READ, ("Signal lost"),
("Input source with different mode %s was detected",
format_string.c_str()),
("detected-format", G_TYPE_STRING, format_string.c_str(), "vpid",
G_TYPE_UINT, item.signal_change.vpid, NULL));
} else {
GST_ELEMENT_WARNING(GST_ELEMENT(self), RESOURCE, READ,
("Signal lost"),
("No input source was detected"));
}
self->signal = FALSE;
g_object_notify(G_OBJECT(self), "signal");
}
queue_item_clear(&item);
goto next_item;
case QUEUE_ITEM_TYPE_ERROR:
GST_ERROR_OBJECT(self, "Stopping because of error on capture thread");
gst_element_post_message(GST_ELEMENT(self),
(GstMessage *)g_steal_pointer(&item.error.msg));
queue_item_clear(&item);
return GST_FLOW_ERROR;
case QUEUE_ITEM_TYPE_FRAMES_DROPPED:
GST_WARNING_OBJECT(
self, "Dropped frames from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
GST_TIME_ARGS(item.frames_dropped.timestamp_start),
GST_TIME_ARGS(item.frames_dropped.timestamp_end));
gst_element_post_message(
GST_ELEMENT(self),
gst_message_new_qos(GST_OBJECT_CAST(self), TRUE, GST_CLOCK_TIME_NONE,
GST_CLOCK_TIME_NONE,
item.frames_dropped.timestamp_start,
item.frames_dropped.timestamp_end -
item.frames_dropped.timestamp_start));
queue_item_clear(&item);
goto next_item;
case QUEUE_ITEM_TYPE_FRAME:
// fall through below
break;
}
g_assert(item.type == QUEUE_ITEM_TYPE_FRAME);
if (!self->signal) {
self->signal = TRUE;
g_object_notify(G_OBJECT(self), "signal");
}
*buffer = (GstBuffer *)g_steal_pointer(&item.frame.video_buffer);
gst_buffer_add_aja_audio_meta(*buffer, item.frame.audio_buffer);
gst_clear_buffer(&item.frame.audio_buffer);
if (item.frame.tc.IsValid()) {
TimecodeFormat tc_format = ::kTCFormatUnknown; TimecodeFormat tc_format = ::kTCFormatUnknown;
GstVideoTimeCodeFlags flags = GST_VIDEO_TIME_CODE_FLAGS_NONE; GstVideoTimeCodeFlags flags = GST_VIDEO_TIME_CODE_FLAGS_NONE;
@ -1687,7 +1810,7 @@ static GstFlowReturn gst_aja_src_create(GstPushSrc *psrc, GstBuffer **buffer) {
flags = flags =
(GstVideoTimeCodeFlags)(flags | GST_VIDEO_TIME_CODE_FLAGS_INTERLACED); (GstVideoTimeCodeFlags)(flags | GST_VIDEO_TIME_CODE_FLAGS_INTERLACED);
CRP188 rp188(item.tc, tc_format); CRP188 rp188(item.frame.tc, tc_format);
{ {
std::stringstream os; std::stringstream os;
@ -1710,40 +1833,41 @@ static GstFlowReturn gst_aja_src_create(GstPushSrc *psrc, GstBuffer **buffer) {
AJAAncillaryList anc_packets; AJAAncillaryList anc_packets;
if (item.anc_buffer) { if (item.frame.anc_buffer) {
GstMapInfo map = GST_MAP_INFO_INIT; GstMapInfo map = GST_MAP_INFO_INIT;
GstMapInfo map2 = GST_MAP_INFO_INIT; GstMapInfo map2 = GST_MAP_INFO_INIT;
gst_buffer_map(item.anc_buffer, &map, GST_MAP_READ); gst_buffer_map(item.frame.anc_buffer, &map, GST_MAP_READ);
if (item.anc_buffer2) gst_buffer_map(item.anc_buffer2, &map2, GST_MAP_READ); if (item.frame.anc_buffer2)
gst_buffer_map(item.frame.anc_buffer2, &map2, GST_MAP_READ);
NTV2_POINTER ptr1(map.data, map.size); NTV2_POINTER ptr1(map.data, map.size);
NTV2_POINTER ptr2(map2.data, map2.size); NTV2_POINTER ptr2(map2.data, map2.size);
AJAAncillaryList::SetFromDeviceAncBuffers(ptr1, ptr2, anc_packets); AJAAncillaryList::SetFromDeviceAncBuffers(ptr1, ptr2, anc_packets);
if (item.anc_buffer2) gst_buffer_unmap(item.anc_buffer2, &map2); if (item.frame.anc_buffer2) gst_buffer_unmap(item.frame.anc_buffer2, &map2);
gst_buffer_unmap(item.anc_buffer, &map); gst_buffer_unmap(item.frame.anc_buffer, &map);
} else if (self->vanc_mode != ::NTV2_VANCMODE_OFF) { } else if (self->vanc_mode != ::NTV2_VANCMODE_OFF) {
GstMapInfo map; GstMapInfo map;
NTV2FormatDescriptor format_desc(self->video_format, ::NTV2_FBF_10BIT_YCBCR, NTV2FormatDescriptor format_desc(self->video_format, ::NTV2_FBF_10BIT_YCBCR,
self->vanc_mode); self->vanc_mode);
gst_buffer_map(item.video_buffer, &map, GST_MAP_READ); gst_buffer_map(*buffer, &map, GST_MAP_READ);
NTV2_POINTER ptr(map.data, map.size); NTV2_POINTER ptr(map.data, map.size);
AJAAncillaryList::SetFromVANCData(ptr, format_desc, anc_packets); AJAAncillaryList::SetFromVANCData(ptr, format_desc, anc_packets);
gst_buffer_unmap(item.video_buffer, &map); gst_buffer_unmap(*buffer, &map);
guint offset = guint offset =
format_desc.RasterLineToByteOffset(format_desc.GetFirstActiveLine()); format_desc.RasterLineToByteOffset(format_desc.GetFirstActiveLine());
guint size = format_desc.GetVisibleRasterBytes(); guint size = format_desc.GetVisibleRasterBytes();
gst_buffer_resize(item.video_buffer, offset, size); gst_buffer_resize(*buffer, offset, size);
} }
gst_clear_buffer(&item.anc_buffer); gst_clear_buffer(&item.frame.anc_buffer);
gst_clear_buffer(&item.anc_buffer2); gst_clear_buffer(&item.frame.anc_buffer2);
// Not using CountAncillaryDataWithType(AJAAncillaryDataType_Cea708) etc // Not using CountAncillaryDataWithType(AJAAncillaryDataType_Cea708) etc
// here because for SD it doesn't recognize the packets. It assumes they // here because for SD it doesn't recognize the packets. It assumes they
@ -1849,7 +1973,7 @@ static GstFlowReturn gst_aja_src_create(GstPushSrc *psrc, GstBuffer **buffer) {
is_letterbox, bar1, bar2); is_letterbox, bar1, bar2);
const NTV2Standard standard( const NTV2Standard standard(
::GetNTV2StandardFromVideoFormat(item.detected_format)); ::GetNTV2StandardFromVideoFormat(item.frame.detected_format));
const NTV2SmpteLineNumber smpte_line_num_info = const NTV2SmpteLineNumber smpte_line_num_info =
::GetSmpteLineNumber(standard); ::GetSmpteLineNumber(standard);
bool field2 = bool field2 =
@ -1866,7 +1990,7 @@ static GstFlowReturn gst_aja_src_create(GstPushSrc *psrc, GstBuffer **buffer) {
bool caps_changed = false; bool caps_changed = false;
CNTV2VPID vpid(item.vpid); CNTV2VPID vpid(item.frame.vpid);
if (vpid.IsValid()) { if (vpid.IsValid()) {
GstVideoInfo info; GstVideoInfo info;
@ -1876,7 +2000,8 @@ static GstFlowReturn gst_aja_src_create(GstPushSrc *psrc, GstBuffer **buffer) {
GST_TRACE_OBJECT(self, "Got valid VPID %s", os.str().c_str()); GST_TRACE_OBJECT(self, "Got valid VPID %s", os.str().c_str());
} }
if (gst_video_info_from_ntv2_video_format(&info, item.detected_format)) { if (gst_video_info_from_ntv2_video_format(&info,
item.frame.detected_format)) {
switch (vpid.GetTransferCharacteristics()) { switch (vpid.GetTransferCharacteristics()) {
default: default:
case NTV2_VPID_TC_SDR_TV: case NTV2_VPID_TC_SDR_TV:
@ -1937,7 +2062,8 @@ static GstFlowReturn gst_aja_src_create(GstPushSrc *psrc, GstBuffer **buffer) {
} else { } else {
GstVideoInfo info; GstVideoInfo info;
if (gst_video_info_from_ntv2_video_format(&info, item.detected_format)) { if (gst_video_info_from_ntv2_video_format(&info,
item.frame.detected_format)) {
// Widescreen PAL/NTSC // Widescreen PAL/NTSC
if (aspect_ratio_flag && info.height == 486) { if (aspect_ratio_flag && info.height == 486) {
info.par_n = 40; info.par_n = 40;
@ -1988,11 +2114,43 @@ static GstFlowReturn gst_aja_src_create(GstPushSrc *psrc, GstBuffer **buffer) {
} }
} }
queue_item_clear(&item);
GST_TRACE_OBJECT(self, "Outputting buffer %" GST_PTR_FORMAT, *buffer); GST_TRACE_OBJECT(self, "Outputting buffer %" GST_PTR_FORMAT, *buffer);
return flow_ret; return flow_ret;
} }
#define AJA_SRC_ERROR(el, domain, code, text, debug) \
G_STMT_START { \
gchar *__txt = _gst_element_error_printf text; \
gchar *__dbg = _gst_element_error_printf debug; \
GstMessage *__msg; \
GError *__err; \
gchar *__name, *__fmt_dbg; \
if (__txt) GST_WARNING_OBJECT(el, "error: %s", __txt); \
if (__dbg) GST_WARNING_OBJECT(el, "error: %s", __dbg); \
if (!__txt) \
__txt = gst_error_get_message(GST_##domain##_ERROR, \
GST_##domain##_ERROR_##code); \
__err = g_error_new_literal(GST_##domain##_ERROR, \
GST_##domain##_ERROR_##code, __txt); \
__name = gst_object_get_path_string(GST_OBJECT_CAST(el)); \
if (__dbg) \
__fmt_dbg = g_strdup_printf("%s(%d): %s (): %s:\n%s", __FILE__, \
__LINE__, GST_FUNCTION, __name, __dbg); \
else \
__fmt_dbg = g_strdup_printf("%s(%d): %s (): %s", __FILE__, __LINE__, \
GST_FUNCTION, __name); \
g_free(__name); \
g_free(__dbg); \
__msg = gst_message_new_error(GST_OBJECT(el), __err, __fmt_dbg); \
QueueItem item = {.type = QUEUE_ITEM_TYPE_ERROR, .error{.msg = __msg}}; \
gst_queue_array_push_tail_struct(el->queue, &item); \
g_cond_signal(&el->queue_cond); \
} \
G_STMT_END;
static void capture_thread_func(AJAThread *thread, void *data) { static void capture_thread_func(AJAThread *thread, void *data) {
GstAjaSrc *self = GST_AJA_SRC(data); GstAjaSrc *self = GST_AJA_SRC(data);
GstClock *clock = NULL; GstClock *clock = NULL;
@ -2000,6 +2158,7 @@ static void capture_thread_func(AJAThread *thread, void *data) {
guint64 frames_dropped_last = G_MAXUINT64; guint64 frames_dropped_last = G_MAXUINT64;
gboolean have_signal = TRUE; gboolean have_signal = TRUE;
guint iterations_without_frame = 0; guint iterations_without_frame = 0;
NTV2VideoFormat last_detected_video_format = ::NTV2_FORMAT_UNKNOWN;
if (self->capture_cpu_core != G_MAXUINT) { if (self->capture_cpu_core != G_MAXUINT) {
cpu_set_t mask; cpu_set_t mask;
@ -2049,8 +2208,8 @@ restart:
if (!gst_aja_src_configure(self)) { if (!gst_aja_src_configure(self)) {
g_mutex_lock(&self->queue_lock); g_mutex_lock(&self->queue_lock);
GST_ELEMENT_ERROR(self, STREAM, FAILED, (NULL), AJA_SRC_ERROR(self, STREAM, FAILED, (NULL),
("Failed to configure device")); ("Failed to configure device"));
goto out; goto out;
} }
g_mutex_lock(&self->queue_lock); g_mutex_lock(&self->queue_lock);
@ -2061,15 +2220,15 @@ restart:
GST_DEBUG_OBJECT(self, "No signal, waiting"); GST_DEBUG_OBJECT(self, "No signal, waiting");
frames_dropped_last = G_MAXUINT64; frames_dropped_last = G_MAXUINT64;
if (have_signal) { if (have_signal) {
GST_ELEMENT_WARNING(GST_ELEMENT(self), RESOURCE, READ, QueueItem item = {
("Signal lost"), .type = QUEUE_ITEM_TYPE_SIGNAL_CHANGE,
("No input source was detected")); .signal_change = {.have_signal = FALSE,
.detected_format = ::NTV2_FORMAT_UNKNOWN,
.vpid = 0}};
gst_queue_array_push_tail_struct(self->queue, &item);
g_cond_signal(&self->queue_cond);
have_signal = FALSE; have_signal = FALSE;
} }
if (self->signal) {
self->signal = FALSE;
g_object_notify(G_OBJECT(self), "signal");
}
self->device->device->WaitForInputVerticalInterrupt(self->channel); self->device->device->WaitForInputVerticalInterrupt(self->channel);
continue; continue;
} }
@ -2084,8 +2243,8 @@ restart:
self->device, self->channel, self->start_frame); self->device, self->channel, self->start_frame);
if (assigned_start_frame == -1) { if (assigned_start_frame == -1) {
GST_ELEMENT_ERROR(self, STREAM, FAILED, (NULL), AJA_SRC_ERROR(self, STREAM, FAILED, (NULL),
("Failed to allocate %u frames", start_frame)); ("Failed to allocate %u frames", start_frame));
goto out; goto out;
} }
@ -2104,8 +2263,8 @@ restart:
? AUTOCIRCULATE_WITH_ANC ? AUTOCIRCULATE_WITH_ANC
: 0), : 0),
1, start_frame, end_frame)) { 1, start_frame, end_frame)) {
GST_ELEMENT_ERROR(self, STREAM, FAILED, (NULL), AJA_SRC_ERROR(self, STREAM, FAILED, (NULL),
("Failed to initialize autocirculate")); ("Failed to initialize autocirculate"));
goto out; goto out;
} }
@ -2183,14 +2342,16 @@ restart:
g_mutex_unlock(&self->queue_lock); g_mutex_unlock(&self->queue_lock);
frames_dropped_last = G_MAXUINT64; frames_dropped_last = G_MAXUINT64;
if (have_signal) { if (have_signal) {
GST_ELEMENT_WARNING(GST_ELEMENT(self), RESOURCE, READ, ("Signal lost"), QueueItem item = {
("No input source was detected")); .type = QUEUE_ITEM_TYPE_SIGNAL_CHANGE,
.signal_change = {.have_signal = FALSE,
.detected_format = ::NTV2_FORMAT_UNKNOWN,
.vpid = 0}};
last_detected_video_format = ::NTV2_FORMAT_UNKNOWN;
gst_queue_array_push_tail_struct(self->queue, &item);
g_cond_signal(&self->queue_cond);
have_signal = FALSE; have_signal = FALSE;
} }
if (self->signal) {
self->signal = FALSE;
g_object_notify(G_OBJECT(self), "signal");
}
self->device->device->WaitForInputVerticalInterrupt(self->channel); self->device->device->WaitForInputVerticalInterrupt(self->channel);
g_mutex_lock(&self->queue_lock); g_mutex_lock(&self->queue_lock);
continue; continue;
@ -2216,16 +2377,17 @@ restart:
effective_string.c_str()); effective_string.c_str());
g_mutex_unlock(&self->queue_lock); g_mutex_unlock(&self->queue_lock);
frames_dropped_last = G_MAXUINT64; frames_dropped_last = G_MAXUINT64;
if (have_signal) { if (have_signal || current_video_format != last_detected_video_format) {
GST_ELEMENT_WARNING(GST_ELEMENT(self), RESOURCE, READ, ("Signal lost"), QueueItem item = {
("Different input source (%s) was detected", .type = QUEUE_ITEM_TYPE_SIGNAL_CHANGE,
current_string.c_str())); .signal_change = {.have_signal = FALSE,
.detected_format = current_video_format,
.vpid = vpid_a}};
last_detected_video_format = current_video_format;
gst_queue_array_push_tail_struct(self->queue, &item);
g_cond_signal(&self->queue_cond);
have_signal = FALSE; have_signal = FALSE;
} }
if (self->signal) {
self->signal = FALSE;
g_object_notify(G_OBJECT(self), "signal");
}
self->device->device->WaitForInputVerticalInterrupt(self->channel); self->device->device->WaitForInputVerticalInterrupt(self->channel);
g_mutex_lock(&self->queue_lock); g_mutex_lock(&self->queue_lock);
continue; continue;
@ -2265,10 +2427,13 @@ restart:
status.acFramesProcessed + status.acFramesDropped, status.acFramesProcessed + status.acFramesDropped,
self->configured_info.fps_n, self->configured_info.fps_n,
self->configured_info.fps_d * GST_SECOND); self->configured_info.fps_d * GST_SECOND);
GstMessage *msg = gst_message_new_qos(
GST_OBJECT_CAST(self), TRUE, GST_CLOCK_TIME_NONE, GST_CLOCK_TIME_NONE, QueueItem item = {.type = QUEUE_ITEM_TYPE_FRAMES_DROPPED,
timestamp, timestamp_end - timestamp); .frames_dropped = {.driver_side = TRUE,
gst_element_post_message(GST_ELEMENT_CAST(self), msg); .timestamp_start = timestamp,
.timestamp_end = timestamp_end}};
gst_queue_array_push_tail_struct(self->queue, &item);
g_cond_signal(&self->queue_cond);
frames_dropped_last = status.acFramesDropped; frames_dropped_last = status.acFramesDropped;
} }
@ -2284,29 +2449,30 @@ restart:
AUTOCIRCULATE_TRANSFER transfer; AUTOCIRCULATE_TRANSFER transfer;
if (!have_signal) { if (!have_signal) {
GST_ELEMENT_INFO(GST_ELEMENT(self), RESOURCE, READ, QueueItem item = {
("Signal recovered"), ("Input source detected")); .type = QUEUE_ITEM_TYPE_SIGNAL_CHANGE,
.signal_change = {.have_signal = TRUE,
.detected_format = current_video_format,
.vpid = vpid_a}};
gst_queue_array_push_tail_struct(self->queue, &item);
g_cond_signal(&self->queue_cond);
have_signal = TRUE; have_signal = TRUE;
} }
if (!self->signal) {
self->signal = TRUE;
g_object_notify(G_OBJECT(self), "signal");
}
iterations_without_frame = 0; iterations_without_frame = 0;
if (gst_buffer_pool_acquire_buffer(self->buffer_pool, &video_buffer, if (gst_buffer_pool_acquire_buffer(self->buffer_pool, &video_buffer,
NULL) != GST_FLOW_OK) { NULL) != GST_FLOW_OK) {
GST_ELEMENT_ERROR(self, STREAM, FAILED, (NULL), AJA_SRC_ERROR(self, STREAM, FAILED, (NULL),
("Failed to acquire video buffer")); ("Failed to acquire video buffer"));
break; break;
} }
if (gst_buffer_pool_acquire_buffer(self->audio_buffer_pool, &audio_buffer, if (gst_buffer_pool_acquire_buffer(self->audio_buffer_pool, &audio_buffer,
NULL) != GST_FLOW_OK) { NULL) != GST_FLOW_OK) {
gst_buffer_unref(video_buffer); gst_buffer_unref(video_buffer);
GST_ELEMENT_ERROR(self, STREAM, FAILED, (NULL), AJA_SRC_ERROR(self, STREAM, FAILED, (NULL),
("Failed to acquire audio buffer")); ("Failed to acquire audio buffer"));
break; break;
} }
@ -2316,8 +2482,8 @@ restart:
NULL) != GST_FLOW_OK) { NULL) != GST_FLOW_OK) {
gst_buffer_unref(audio_buffer); gst_buffer_unref(audio_buffer);
gst_buffer_unref(video_buffer); gst_buffer_unref(video_buffer);
GST_ELEMENT_ERROR(self, STREAM, FAILED, (NULL), AJA_SRC_ERROR(self, STREAM, FAILED, (NULL),
("Failed to acquire anc buffer")); ("Failed to acquire anc buffer"));
break; break;
} }
@ -2328,8 +2494,8 @@ restart:
gst_buffer_unref(anc_buffer); gst_buffer_unref(anc_buffer);
gst_buffer_unref(audio_buffer); gst_buffer_unref(audio_buffer);
gst_buffer_unref(video_buffer); gst_buffer_unref(video_buffer);
GST_ELEMENT_ERROR(self, STREAM, FAILED, (NULL), AJA_SRC_ERROR(self, STREAM, FAILED, (NULL),
("Failed to acquire anc buffer")); ("Failed to acquire anc buffer"));
break; break;
} }
} }
@ -2434,45 +2600,54 @@ restart:
QueueItem item = { QueueItem item = {
.type = QUEUE_ITEM_TYPE_FRAME, .type = QUEUE_ITEM_TYPE_FRAME,
.capture_time = now_gst, .frame = {.capture_time = now_gst,
.video_buffer = video_buffer, .video_buffer = video_buffer,
.audio_buffer = audio_buffer, .audio_buffer = audio_buffer,
.anc_buffer = anc_buffer, .anc_buffer = anc_buffer,
.anc_buffer2 = anc_buffer2, .anc_buffer2 = anc_buffer2,
.tc = time_code, .tc = time_code,
.detected_format = .detected_format =
(self->quad_mode ? ::GetQuadSizedVideoFormat(current_video_format) (self->quad_mode
: current_video_format), ? ::GetQuadSizedVideoFormat(current_video_format)
.vpid = vpid_a}; : current_video_format),
.vpid = vpid_a}};
while (gst_queue_array_get_length(self->queue) >= self->queue_size) { while (self->queue_num_frames >= self->queue_size) {
QueueItem *tmp = guint n = gst_queue_array_get_length(self->queue);
(QueueItem *)gst_queue_array_pop_head_struct(self->queue);
if (tmp->type == QUEUE_ITEM_TYPE_FRAME) { for (guint i = 0; i < n; i++) {
GST_WARNING_OBJECT(self, "Element queue overrun, dropping old frame"); QueueItem *tmp =
(QueueItem *)gst_queue_array_peek_nth_struct(self->queue, i);
if (tmp->type == QUEUE_ITEM_TYPE_FRAME) {
GST_WARNING_OBJECT(self,
"Element queue overrun, dropping old frame");
GstMessage *msg = gst_message_new_qos( QueueItem item = {
GST_OBJECT_CAST(self), TRUE, GST_CLOCK_TIME_NONE, .type = QUEUE_ITEM_TYPE_FRAMES_DROPPED,
GST_CLOCK_TIME_NONE, tmp->capture_time, .frames_dropped = {
gst_util_uint64_scale(GST_SECOND, self->configured_info.fps_d, .driver_side = FALSE,
self->configured_info.fps_n)); .timestamp_start = tmp->frame.capture_time,
gst_element_post_message(GST_ELEMENT_CAST(self), msg); .timestamp_end =
tmp->frame.capture_time +
gst_clear_buffer(&tmp->video_buffer); gst_util_uint64_scale(GST_SECOND,
gst_clear_buffer(&tmp->audio_buffer); self->configured_info.fps_d,
gst_clear_buffer(&tmp->anc_buffer); self->configured_info.fps_n)}};
gst_clear_buffer(&tmp->anc_buffer2); queue_item_clear(tmp);
gst_queue_array_drop_struct(self->queue, i, NULL);
gst_queue_array_push_tail_struct(self->queue, &item);
self->queue_num_frames -= 1;
g_cond_signal(&self->queue_cond);
break;
}
} }
} }
GST_TRACE_OBJECT(self, "Queuing frame %" GST_TIME_FORMAT, GST_TRACE_OBJECT(self, "Queuing frame %" GST_TIME_FORMAT,
GST_TIME_ARGS(now_gst)); GST_TIME_ARGS(now_gst));
gst_queue_array_push_tail_struct(self->queue, &item); gst_queue_array_push_tail_struct(self->queue, &item);
GST_TRACE_OBJECT(self, "%u frames queued", self->queue_num_frames += 1;
gst_queue_array_get_length(self->queue)); GST_TRACE_OBJECT(self, "%u frames queued", self->queue_num_frames);
g_cond_signal(&self->queue_cond); g_cond_signal(&self->queue_cond);
} else { } else {
g_mutex_unlock(&self->queue_lock); g_mutex_unlock(&self->queue_lock);
@ -2483,15 +2658,17 @@ restart:
iterations_without_frame++; iterations_without_frame++;
} else { } else {
frames_dropped_last = G_MAXUINT64; frames_dropped_last = G_MAXUINT64;
if (have_signal) { if (have_signal || last_detected_video_format != current_video_format) {
GST_ELEMENT_WARNING(GST_ELEMENT(self), RESOURCE, READ, QueueItem item = {
("Signal lost"), ("No frames captured")); .type = QUEUE_ITEM_TYPE_SIGNAL_CHANGE,
.signal_change = {.have_signal = TRUE,
.detected_format = current_video_format,
.vpid = vpid_a}};
last_detected_video_format = current_video_format;
gst_queue_array_push_tail_struct(self->queue, &item);
g_cond_signal(&self->queue_cond);
have_signal = FALSE; have_signal = FALSE;
} }
if (self->signal) {
self->signal = FALSE;
g_object_notify(G_OBJECT(self), "signal");
}
} }
self->device->device->WaitForInputVerticalInterrupt(self->channel); self->device->device->WaitForInputVerticalInterrupt(self->channel);

View file

@ -48,6 +48,7 @@ struct _GstAjaSrc {
GMutex queue_lock; GMutex queue_lock;
GCond queue_cond; GCond queue_cond;
GstQueueArray *queue; GstQueueArray *queue;
guint queue_num_frames;
gboolean playing; gboolean playing;
gboolean shutdown; gboolean shutdown;
gboolean flushing; gboolean flushing;