avtp: Add fragmented packets handling to CVF depayloader

This patch adds to the CVF depayloader the capability to regroup H.264
fragmented FU-A packets.

After all packets are regrouped, they are added to the "stash" of H.264
NAL units that will be sent as soon as an AVTP packet with M bit set is
found (usually, the last fragment).

Unrecognized fragments (such as first fragment seen, but with no Start
bit set) are discarded - and any NAL units on the "stash" are sent
downstream, as if a SEQNUM discontinuty happened.
This commit is contained in:
Ederson de Souza 2019-03-20 16:40:13 -07:00
parent 45d2f5a779
commit 45624661ba
2 changed files with 150 additions and 2 deletions

View file

@ -61,6 +61,12 @@ static GstFlowReturn gst_avtp_cvf_depay_chain (GstPad * pad, GstObject * parent,
#define FU_A_TYPE 28
#define FU_B_TYPE 29
#define NRI_MASK 0x60
#define NRI_SHIFT 5
#define START_MASK 0x80
#define START_SHIFT 7
#define END_MASK 0x40
#define END_SHIFT 6
#define NAL_TYPE_MASK 0x1f
/* pad templates */
@ -100,6 +106,7 @@ static void
gst_avtp_cvf_depay_init (GstAvtpCvfDepay * avtpcvfdepay)
{
avtpcvfdepay->out_buffer = NULL;
avtpcvfdepay->fragments = NULL;
avtpcvfdepay->seqnum = 0;
}
@ -238,6 +245,13 @@ gst_avtp_cvf_depay_push_and_discard (GstAvtpCvfDepay * avtpcvfdepay)
gst_avtp_cvf_depay_push (avtpcvfdepay);
}
/* Discard any incomplete fragments */
if (avtpcvfdepay->fragments != NULL) {
GST_DEBUG_OBJECT (avtpcvfdepay, "Discarding incomplete fragments");
gst_buffer_unref (avtpcvfdepay->fragments);
avtpcvfdepay->fragments = NULL;
}
}
static gboolean
@ -446,11 +460,138 @@ gst_avtp_cvf_depay_get_nalu_size (GstAvtpCvfDepay * avtpcvfdepay,
}
static GstFlowReturn
gst_avtp_cvf_depay_handle_fragment_a (GstAvtpCvfDepay * avtpcvfdepay,
gst_avtp_cvf_depay_process_last_fragment (GstAvtpCvfDepay * avtpcvfdepay,
GstBuffer * avtpdu, GstMapInfo * map, gsize offset, gsize nalu_size,
guint8 nri, guint8 nal_type)
{
GstBuffer *nal;
GstMapInfo map_nal;
GstClockTime pts, dts;
gboolean M;
GstFlowReturn ret = GST_FLOW_OK;
if (G_UNLIKELY (avtpcvfdepay->fragments == NULL)) {
GST_WARNING_OBJECT (avtpcvfdepay,
"Received final fragment, but no start fragment received. Dropping it.");
goto end;
}
gst_buffer_copy_into (avtpcvfdepay->fragments, avtpdu,
GST_BUFFER_COPY_MEMORY, offset, nalu_size);
/* Allocate buffer to keep the nal_header (1 byte) and the NALu size (4 bytes) */
nal = gst_buffer_new_allocate (NULL, 4 + 1, NULL);
if (G_UNLIKELY (nal == NULL)) {
GST_ERROR_OBJECT (avtpcvfdepay, "Could not allocate buffer");
ret = GST_FLOW_ERROR;
goto end;
}
gst_buffer_map (nal, &map_nal, GST_MAP_READWRITE);
/* Add NAL size. Extra 1 counts the nal_header */
nalu_size = gst_buffer_get_size (avtpcvfdepay->fragments) + 1;
map_nal.data[0] = nalu_size >> 24;
map_nal.data[1] = nalu_size >> 16;
map_nal.data[2] = nalu_size >> 8;
map_nal.data[3] = nalu_size;
/* Finally, add the nal_header */
map_nal.data[4] = (nri << 5) | nal_type;
gst_buffer_unmap (nal, &map_nal);
nal = gst_buffer_append (nal, avtpcvfdepay->fragments);
gst_avtp_cvf_depay_get_avtp_timestamps (avtpcvfdepay, map, &pts, &dts);
GST_BUFFER_PTS (nal) = pts;
GST_BUFFER_DTS (nal) = dts;
gst_avtp_cvf_depay_get_M (avtpcvfdepay, map, &M);
gst_avtp_cvf_depay_internal_push (avtpcvfdepay, nal, M);
avtpcvfdepay->fragments = NULL;
end:
return ret;
}
static GstFlowReturn
gst_avtp_cvf_depay_handle_fu_a (GstAvtpCvfDepay * avtpcvfdepay,
GstBuffer * avtpdu, GstMapInfo * map)
{
GstFlowReturn ret = GST_FLOW_OK;
struct avtp_stream_pdu *pdu;
struct avtp_cvf_h264_payload *pay;
guint8 fu_header, fu_indicator, nal_type, start, end, nri;
guint16 nalu_size;
gsize offset;
if (G_UNLIKELY (map->size - AVTP_CVF_H264_HEADER_SIZE < 2)) {
GST_ERROR_OBJECT (avtpcvfdepay,
"Buffer too small to contain fragment headers, size: %lu",
map->size - AVTP_CVF_H264_HEADER_SIZE);
gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
goto end;
}
pdu = (struct avtp_stream_pdu *) map->data;
pay = (struct avtp_cvf_h264_payload *) pdu->avtp_payload;
fu_indicator = pay->h264_data[0];
nri = (fu_indicator & NRI_MASK) >> NRI_SHIFT;
GST_DEBUG_OBJECT (avtpcvfdepay, "Fragment indicator - NRI: %u", nri);
fu_header = pay->h264_data[1];
nal_type = fu_header & NAL_TYPE_MASK;
start = (fu_header & START_MASK) >> START_SHIFT;
end = (fu_header & END_MASK) >> END_SHIFT;
GST_DEBUG_OBJECT (avtpcvfdepay,
"Fragment header - type: %u start: %u end: %u", nal_type, start, end);
if (G_UNLIKELY (start && end)) {
GST_ERROR_OBJECT (avtpcvfdepay,
"Invalid fragment header - 'start' and 'end' bits set");
gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
goto end;
}
/* Size and offset also ignores the FU_HEADER and FU_INDICATOR fields,
* hence the "sizeof(guint8) * 2" */
offset = AVTP_CVF_H264_HEADER_SIZE + sizeof (guint8) * 2;
gst_avtp_cvf_depay_get_nalu_size (avtpcvfdepay, map, &nalu_size);
nalu_size -= sizeof (guint8) * 2;
if (start) {
if (G_UNLIKELY (avtpcvfdepay->fragments != NULL)) {
GST_WARNING_OBJECT (avtpcvfdepay,
"Received starting fragment, but previous one is not complete. Dropping old fragment");
gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
}
avtpcvfdepay->fragments =
gst_buffer_copy_region (avtpdu, GST_BUFFER_COPY_MEMORY, offset,
nalu_size);
}
if (!start && !end) {
if (G_UNLIKELY (avtpcvfdepay->fragments == NULL)) {
GST_WARNING_OBJECT (avtpcvfdepay,
"Received intermediate fragment, but no start fragment received. Dropping it.");
gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
goto end;
}
gst_buffer_copy_into (avtpcvfdepay->fragments, avtpdu,
GST_BUFFER_COPY_MEMORY, offset, nalu_size);
}
if (end) {
ret =
gst_avtp_cvf_depay_process_last_fragment (avtpcvfdepay, avtpdu, map,
offset, nalu_size, nri, nal_type);
}
end:
return ret;
}
@ -466,6 +607,12 @@ gst_avtp_cvf_depay_handle_single_nal (GstAvtpCvfDepay * avtpcvfdepay,
GST_DEBUG_OBJECT (avtpcvfdepay, "Handling single NAL unit");
if (avtpcvfdepay->fragments != NULL) {
GST_WARNING_OBJECT (avtpcvfdepay,
"Received single NAL unit, but previous fragment is incomplete. Dropping fragment.");
gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
}
gst_avtp_cvf_depay_get_avtp_timestamps (avtpcvfdepay, map, &pts, &dts);
gst_avtp_cvf_depay_get_nalu_size (avtpcvfdepay, map, &nalu_size);
gst_avtp_cvf_depay_get_M (avtpcvfdepay, map, &M);
@ -513,7 +660,7 @@ gst_avtp_cvf_depay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
"AVTP aggregation packets not supported, dropping it");
break;
case FU_A_TYPE:
ret = gst_avtp_cvf_depay_handle_fragment_a (avtpcvfdepay, buffer, &map);
ret = gst_avtp_cvf_depay_handle_fu_a (avtpcvfdepay, buffer, &map);
break;
case FU_B_TYPE:
GST_WARNING_OBJECT (avtpcvfdepay,

View file

@ -47,6 +47,7 @@ struct _GstAvtpCvfDepay
guint8 seqnum;
GstBuffer *out_buffer;
GstBuffer *fragments;
};
struct _GstAvtpCvfDepayClass