From 2b457a46a06c9ada62942aa1b1773fc59530523a Mon Sep 17 00:00:00 2001 From: Julien MOUTTE Date: Wed, 6 Jan 2016 21:39:00 +0000 Subject: [PATCH] rtpmsink: Implement setcaps that uses streamheader This allow adding rtmpsink after the flv streaming have started. Otherwise, FLV streamheader is never sent to the server, which cannot figure-out what is this stream about. It should also help in certain renegotiation figures. The sink will no longer work without an streamheader in caps, though there is no known implementation of flvdemux that does not support this. https://bugzilla.gnome.org/show_bug.cgi?id=760242 --- ext/rtmp/gstrtmpsink.c | 89 +++++++++++++++++++++++++++++++----------- ext/rtmp/gstrtmpsink.h | 2 +- 2 files changed, 68 insertions(+), 23 deletions(-) diff --git a/ext/rtmp/gstrtmpsink.c b/ext/rtmp/gstrtmpsink.c index 462ac4d068..781f3bfe3f 100644 --- a/ext/rtmp/gstrtmpsink.c +++ b/ext/rtmp/gstrtmpsink.c @@ -76,6 +76,7 @@ static void gst_rtmp_sink_finalize (GObject * object); static gboolean gst_rtmp_sink_stop (GstBaseSink * sink); static gboolean gst_rtmp_sink_start (GstBaseSink * sink); static gboolean gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event); +static gboolean gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps); static GstFlowReturn gst_rtmp_sink_render (GstBaseSink * sink, GstBuffer * buf); #define gst_rtmp_sink_parent_class parent_class @@ -114,7 +115,8 @@ gst_rtmp_sink_class_init (GstRTMPSinkClass * klass) gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp_sink_start); gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp_sink_stop); gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp_sink_render); - gstbasesink_class->event = gst_rtmp_sink_event; + gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp_sink_setcaps); + gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp_sink_event); GST_DEBUG_CATEGORY_INIT (gst_rtmp_sink_debug, "rtmpsink", 0, "RTMP server element"); @@ -200,8 +202,10 @@ gst_rtmp_sink_stop (GstBaseSink * basesink) { GstRTMPSink *sink = GST_RTMP_SINK (basesink); - gst_buffer_replace (&sink->cache, NULL); - + if (sink->header) { + gst_buffer_unref (sink->header); + sink->header = NULL; + } if (sink->rtmp) { RTMP_Close (sink->rtmp); RTMP_Free (sink->rtmp); @@ -219,7 +223,7 @@ static GstFlowReturn gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf) { GstRTMPSink *sink = GST_RTMP_SINK (bsink); - GstBuffer *reffed_buf = NULL; + gboolean need_unref = FALSE; GstMapInfo map = GST_MAP_INFO_INIT; if (sink->rtmp == NULL) { @@ -228,6 +232,11 @@ gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf) return GST_FLOW_ERROR; } + /* Ignore buffers that are in the stream headers (caps) */ + if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) { + return GST_FLOW_OK; + } + if (sink->first) { /* open the connection */ if (!RTMP_IsConnected (sink->rtmp)) { @@ -245,21 +254,14 @@ gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf) GST_DEBUG_OBJECT (sink, "Opened connection to %s", sink->rtmp_uri); } - /* FIXME: Parse the first buffer and see if it contains a header plus a packet instead - * of just assuming it's only the header */ - GST_LOG_OBJECT (sink, "Caching first buffer of size %" G_GSIZE_FORMAT - " for concatenation", gst_buffer_get_size (buf)); - gst_buffer_replace (&sink->cache, buf); - sink->first = FALSE; - return GST_FLOW_OK; - } + /* Prepend the header from the caps to the first non header buffer */ + if (sink->header) { + buf = gst_buffer_append (gst_buffer_ref (sink->header), + gst_buffer_ref (buf)); + need_unref = TRUE; + } - if (sink->cache) { - GST_LOG_OBJECT (sink, "Joining 2nd buffer of size %" G_GSIZE_FORMAT - " to cached buf", gst_buffer_get_size (buf)); - gst_buffer_ref (buf); - reffed_buf = buf = gst_buffer_append (sink->cache, buf); - sink->cache = NULL; + sink->first = FALSE; } if (sink->have_write_error) @@ -274,8 +276,8 @@ gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf) goto write_failed; gst_buffer_unmap (buf, &map); - if (reffed_buf) - gst_buffer_unref (reffed_buf); + if (need_unref) + gst_buffer_unref (buf); return GST_FLOW_OK; @@ -284,8 +286,8 @@ write_failed: { GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), ("Failed to write data")); gst_buffer_unmap (buf, &map); - if (reffed_buf) - gst_buffer_unref (reffed_buf); + if (need_unref) + gst_buffer_unref (buf); sink->have_write_error = TRUE; return GST_FLOW_ERROR; } @@ -391,6 +393,49 @@ gst_rtmp_sink_set_property (GObject * object, guint prop_id, } } +static gboolean +gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps) +{ + GstRTMPSink *rtmpsink = GST_RTMP_SINK (sink); + GstStructure *s; + const GValue *sh; + GArray *buffers; + gint i; + + GST_DEBUG_OBJECT (sink, "caps set to %" GST_PTR_FORMAT, caps); + + /* Clear our current header buffer */ + if (rtmpsink->header) { + gst_buffer_unref (rtmpsink->header); + rtmpsink->header = NULL; + } + + rtmpsink->header = gst_buffer_new (); + + s = gst_caps_get_structure (caps, 0); + + sh = gst_structure_get_value (s, "streamheader"); + buffers = g_value_peek_pointer (sh); + + /* Concatenate all buffers in streamheader into one */ + for (i = 0; i < buffers->len; ++i) { + GValue *val; + GstBuffer *buf; + + val = &g_array_index (buffers, GValue, i); + buf = g_value_peek_pointer (val); + + gst_buffer_ref (buf); + + rtmpsink->header = gst_buffer_append (rtmpsink->header, buf); + } + + GST_DEBUG_OBJECT (rtmpsink, "have %" G_GSIZE_FORMAT " bytes of header data", + gst_buffer_get_size (rtmpsink->header)); + + return TRUE; +} + static gboolean gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event) { diff --git a/ext/rtmp/gstrtmpsink.h b/ext/rtmp/gstrtmpsink.h index 523f5ae0a4..b9072f5000 100644 --- a/ext/rtmp/gstrtmpsink.h +++ b/ext/rtmp/gstrtmpsink.h @@ -53,7 +53,7 @@ struct _GstRTMPSink { RTMP *rtmp; gchar *rtmp_uri; /* copy of url for librtmp */ - GstBuffer *cache; /* Cached buffer */ + GstBuffer *header; gboolean first; gboolean have_write_error; };