rtmp2: Add support for AGGREGATE messages

They're multiple frames (tags) of FLV data wrapped into a message.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1384>
This commit is contained in:
Jan Alexander Steffens (heftig) 2020-06-29 19:47:16 +02:00 committed by GStreamer Merge Bot
parent 30b1187108
commit 2ad3aab1d4

View file

@ -105,6 +105,8 @@ static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
guint needed_bytes);
static void gst_rtmp_connection_try_read (GstRtmpConnection * sc);
static void gst_rtmp_connection_do_read (GstRtmpConnection * sc);
static void gst_rtmp_connection_handle_aggregate (GstRtmpConnection *
connection, GstBuffer * buffer);
static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection *
connection, GstBuffer * buffer);
static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection,
@ -699,6 +701,10 @@ gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
gst_rtmp_connection_handle_cm (sc, buffer);
return;
case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
gst_rtmp_connection_handle_aggregate (sc, buffer);
break;
default:
if (sc->input_handler) {
sc->input_handler (sc, buffer, sc->input_handler_user_data);
@ -707,6 +713,83 @@ gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
}
}
static void
gst_rtmp_connection_handle_aggregate (GstRtmpConnection * connection,
GstBuffer * buffer)
{
GstRtmpMeta *meta;
GstMapInfo map;
gsize pos = 0;
guint32 first_ts = 0;
meta = gst_buffer_get_rtmp_meta (buffer);
g_return_if_fail (meta);
gst_buffer_map (buffer, &map, GST_MAP_READ);
GST_TRACE_OBJECT (connection, "got aggregate message");
/* Parse Aggregate Messages as described in rtmp_specification_1.0.pdf page 26
* The payload is part of a FLV file.
*
* WARNING: This spec defines the payload to use an "RTMP message format"
* which misidentifies the format of the timestamps and omits the size of the
* backpointers. */
while (pos < map.size) {
gsize remaining = map.size - pos;
GstBuffer *submessage;
GstRtmpMeta *submeta;
GstRtmpFlvTagHeader header;
if (!gst_rtmp_flv_tag_parse_header (&header, map.data + pos, remaining)) {
GST_ERROR_OBJECT (connection,
"aggregate contains incomplete header; want %d, got %" G_GSIZE_FORMAT,
GST_RTMP_FLV_TAG_HEADER_SIZE, remaining);
break;
}
if (remaining < header.total_size) {
GST_ERROR_OBJECT (connection,
"aggregate contains incomplete message; want %" G_GSIZE_FORMAT
", got %" G_GSIZE_FORMAT, header.total_size, remaining);
break;
}
submessage = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_FLAGS |
GST_BUFFER_COPY_META | GST_BUFFER_COPY_MEMORY,
pos + GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
GST_BUFFER_DTS (submessage) = GST_BUFFER_DTS (buffer);
GST_BUFFER_OFFSET (submessage) = GST_BUFFER_OFFSET (buffer) + pos;
GST_BUFFER_OFFSET_END (submessage) =
GST_BUFFER_OFFSET (submessage) + header.total_size;
submeta = gst_buffer_get_rtmp_meta (submessage);
g_assert (submeta);
submeta->type = header.type;
submeta->size = header.payload_size;
if (pos == 0) {
first_ts = header.timestamp;
} else {
guint32 ts_offset = header.timestamp - first_ts;
submeta->ts_delta += ts_offset;
GST_BUFFER_DTS (submessage) += ts_offset * GST_MSECOND;
GST_BUFFER_FLAG_UNSET (submessage, GST_BUFFER_FLAG_DISCONT);
}
gst_rtmp_buffer_dump (submessage, "<<< submessage");
gst_rtmp_connection_handle_message (connection, submessage);
gst_buffer_unref (submessage);
pos += header.total_size;
}
gst_buffer_unmap (buffer, &map);
}
static void
gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
GstBuffer * buffer)