irtspparse: handle multiple and incomplete frames

Interleaved frames can be fragmented between
incoming frames. Thus, we can have multiple
frames within the single input frame, as well as
incomplete frame. Now it preserves parsing
state and handle both situations.

Fixes #991
This commit is contained in:
OleksandrKvl 2019-07-02 14:30:35 +03:00 committed by Sebastian Dröge
parent 98e4967337
commit 9a39ba6a35
2 changed files with 113 additions and 53 deletions

View file

@ -117,12 +117,15 @@ gst_irtsp_parse_class_init (GstIRTSPParseClass * klass)
static void
gst_irtsp_parse_reset (GstIRTSPParse * IRTSPParse)
{
IRTSPParse->state = IRTSP_SEARCH_FRAME;
IRTSPParse->current_offset = 0;
IRTSPParse->discont = FALSE;
}
static void
gst_irtsp_parse_init (GstIRTSPParse * IRTSPParse)
{
gst_base_parse_set_min_frame_size (GST_BASE_PARSE (IRTSPParse), 4);
gst_base_parse_set_min_frame_size (GST_BASE_PARSE (IRTSPParse), 1);
gst_irtsp_parse_reset (IRTSPParse);
}
@ -152,64 +155,110 @@ gst_irtsp_parse_stop (GstBaseParse * parse)
return TRUE;
}
static void
gst_irtsp_set_caps_once (GstBaseParse * parse)
{
if (!gst_pad_has_current_caps (GST_BASE_PARSE_SRC_PAD (parse))) {
GstCaps *caps = gst_caps_new_empty_simple ("application/x-rtp");
gst_pad_set_caps (GST_BASE_PARSE_SRC_PAD (parse), caps);
gst_caps_unref (caps);
}
}
static GstFlowReturn
gst_irtsp_parse_handle_frame (GstBaseParse * parse,
GstBaseParseFrame * frame, gint * skipsize)
{
static const guint frame_header_size = sizeof (guint8) * 4;
static const guint8 frame_header_magic = 0x24;
GstIRTSPParse *IRTSPParse = GST_IRTSP_PARSE (parse);
GstBuffer *buf = frame->buffer;
GstByteReader reader;
gint off;
GstMapInfo map;
guint framesize;
const guint8 *frame_start;
guint8 current_channel_id;
const guint8 *data;
guint data_size;
guint flushed_size;
if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (frame->buffer,
GST_BUFFER_FLAG_DISCONT))) {
IRTSPParse->discont = TRUE;
}
gst_buffer_map (buf, &map, GST_MAP_READ);
if (G_UNLIKELY (map.size < 4))
goto exit;
gst_byte_reader_init (&reader, map.data, map.size);
start:
g_assert (map.size >= IRTSPParse->current_offset);
data = &map.data[IRTSPParse->current_offset];
data_size = map.size - IRTSPParse->current_offset;
off = gst_byte_reader_masked_scan_uint32 (&reader, 0xffff0000,
0x24000000 + (IRTSPParse->channel_id << 16), 0, map.size);
switch (IRTSPParse->state) {
case IRTSP_SEARCH_FRAME:
/* Use the first occurence of 0x24 as a start of interleaved frames.
* This 'trick' allows us to parse a dump that doesn't contain RTSP
* handshake. It's up to user to provide the data where the first 0x24
* is an RTSP frame */
frame_start = memchr (data, frame_header_magic, data_size);
if (frame_start) {
IRTSPParse->state = IRTSP_PARSE_FRAME;
IRTSPParse->current_offset += frame_start - data;
goto start;
} else {
IRTSPParse->current_offset += data_size;
}
break;
case IRTSP_PARSE_FRAME:
if (data_size > 0 && data[0] != frame_header_magic) {
IRTSPParse->state = IRTSP_SEARCH_FRAME;
goto start;
}
GST_LOG_OBJECT (parse, "possible sync at buffer offset %d", off);
if (data_size >= frame_header_size) {
IRTSPParse->current_offset += frame_header_size;
current_channel_id = data[1];
IRTSPParse->frame_size = GST_READ_UINT16_BE (&data[2]);
if (current_channel_id != IRTSPParse->target_channel_id) {
IRTSPParse->state = IRTSP_SKIP_FRAME;
} else {
IRTSPParse->state = IRTSP_FLUSH_FRAME;
}
goto start;
}
break;
case IRTSP_SKIP_FRAME:
if (data_size >= IRTSPParse->frame_size) {
IRTSPParse->current_offset += IRTSPParse->frame_size;
IRTSPParse->state = IRTSP_PARSE_FRAME;
goto start;
}
break;
case IRTSP_FLUSH_FRAME:
if (data_size >= IRTSPParse->frame_size) {
gst_irtsp_set_caps_once (parse);
gst_buffer_unmap (buf, &map);
/* didn't find anything that looks like a sync word, skip */
if (off < 0) {
*skipsize = map.size - 3;
goto exit;
frame->out_buffer = gst_buffer_copy (frame->buffer);
gst_buffer_resize (frame->out_buffer, IRTSPParse->current_offset,
IRTSPParse->frame_size);
if (G_UNLIKELY (IRTSPParse->discont)) {
GST_BUFFER_FLAG_SET (frame->out_buffer, GST_BUFFER_FLAG_DISCONT);
IRTSPParse->discont = FALSE;
}
flushed_size = IRTSPParse->current_offset + IRTSPParse->frame_size;
IRTSPParse->current_offset = 0;
IRTSPParse->state = IRTSP_PARSE_FRAME;
return gst_base_parse_finish_frame (parse, frame, flushed_size);
}
break;
default:
g_assert_not_reached ();
break;
}
/* possible frame header, but not at offset 0? skip bytes before sync */
if (off > 0) {
*skipsize = off;
goto exit;
}
framesize = GST_READ_UINT16_BE (map.data + 2) + 4;
GST_LOG_OBJECT (parse, "got frame size %d", framesize);
if (!gst_pad_has_current_caps (GST_BASE_PARSE_SRC_PAD (parse))) {
GstCaps *caps;
caps = gst_caps_new_empty_simple ("application/x-rtp");
gst_pad_set_caps (GST_BASE_PARSE_SRC_PAD (parse), caps);
gst_caps_unref (caps);
}
if (framesize <= map.size) {
gst_buffer_unmap (buf, &map);
/* HACK HACK skip header.
* could also ask baseparse to skip this,
* but that would give us a discontinuity for free
* which is a bit too much to have on all our packets */
frame->out_buffer = gst_buffer_copy (frame->buffer);
gst_buffer_resize (frame->out_buffer, 4, -1);
GST_BUFFER_FLAG_UNSET (frame->out_buffer, GST_BUFFER_FLAG_DISCONT);
return gst_base_parse_finish_frame (parse, frame, framesize);
}
exit:
gst_buffer_unmap (buf, &map);
return GST_FLOW_OK;
}
@ -222,7 +271,7 @@ gst_irtsp_parse_set_property (GObject * object,
switch (prop_id) {
case PROP_CHANNEL_ID:
IRTSPParse->channel_id = g_value_get_int (value);
IRTSPParse->target_channel_id = g_value_get_int (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -238,7 +287,7 @@ gst_irtsp_parse_get_property (GObject * object,
switch (prop_id) {
case PROP_CHANNEL_ID:
g_value_set_int (value, IRTSPParse->channel_id);
g_value_set_int (value, IRTSPParse->target_channel_id);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);

View file

@ -26,7 +26,6 @@
#include <gst/base/gstbaseparse.h>
G_BEGIN_DECLS
#define GST_TYPE_IRTSP_PARSE \
(gst_irtsp_parse_get_type())
#define GST_IRTSP_PARSE(obj) \
@ -37,20 +36,32 @@ G_BEGIN_DECLS
(G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_IRTSP_PARSE))
#define GST_IS_IRTSP_PARSE_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_IRTSP_PARSE))
typedef struct _GstIRTSPParse GstIRTSPParse;
typedef struct _GstIRTSPParseClass GstIRTSPParseClass;
typedef enum
{
IRTSP_SEARCH_FRAME,
IRTSP_PARSE_FRAME,
IRTSP_FLUSH_FRAME,
IRTSP_SKIP_FRAME
} RtspParserState;
/**
* GstIRTSPParse:
*
* The opaque GstIRTSPParse object
*/
struct _GstIRTSPParse {
struct _GstIRTSPParse
{
GstBaseParse baseparse;
guint8 channel_id;
/*< private >*/
guint8 target_channel_id;
/*< private > */
RtspParserState state;
guint16 frame_size;
guint current_offset;
gboolean discont;
};
/**
@ -59,12 +70,12 @@ struct _GstIRTSPParse {
*
* The opaque GstIRTSPParseClass data structure.
*/
struct _GstIRTSPParseClass {
struct _GstIRTSPParseClass
{
GstBaseParseClass baseparse_class;
};
GType gst_irtsp_parse_get_type (void);
G_END_DECLS
#endif /* __GST_IRTSP_PARSE_H__ */