gstreamer/ext/rtmp/gstrtmpsink.c
Julien MOUTTE 2b457a46a0 rtpmsink: Implement setcaps that uses streamheader
This allow adding rtmpsink after the flv streaming have started. Otherwise,
FLV streamheader is never sent to the server, which cannot figure-out
what is this stream about. It should also help in certain renegotiation
figures. The sink will no longer work without an streamheader in caps,
though there is no known implementation of flvdemux that does not
support this.

https://bugzilla.gnome.org/show_bug.cgi?id=760242
2016-02-09 19:35:22 -05:00

469 lines
12 KiB
C

/*
* GStreamer
* Copyright (C) 2010 Jan Schmidt <thaytan@noraisin.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-rtmpsink
*
* This element delivers data to a streaming server via RTMP. It uses
* librtmp, and supports any protocols/urls that librtmp supports.
* The URL/location can contain extra connection or session parameters
* for librtmp, such as 'flashver=version'. See the librtmp documentation
* for more detail
*
* <refsect2>
* <title>Example launch line</title>
* |[
* gst-launch-1.0 -v videotestsrc ! ffenc_flv ! flvmux ! rtmpsink location='rtmp://localhost/path/to/stream live=1'
* ]| Encode a test video stream to FLV video format and stream it via RTMP.
* </refsect2>
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst.h>
#include "gstrtmpsink.h"
#ifdef G_OS_WIN32
#include <winsock2.h>
#endif
#include <stdlib.h>
GST_DEBUG_CATEGORY_STATIC (gst_rtmp_sink_debug);
#define GST_CAT_DEFAULT gst_rtmp_sink_debug
#define DEFAULT_LOCATION NULL
enum
{
PROP_0,
PROP_LOCATION
};
static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("video/x-flv")
);
static void gst_rtmp_sink_uri_handler_init (gpointer g_iface,
gpointer iface_data);
static void gst_rtmp_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_rtmp_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_rtmp_sink_finalize (GObject * object);
static gboolean gst_rtmp_sink_stop (GstBaseSink * sink);
static gboolean gst_rtmp_sink_start (GstBaseSink * sink);
static gboolean gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event);
static gboolean gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps);
static GstFlowReturn gst_rtmp_sink_render (GstBaseSink * sink, GstBuffer * buf);
#define gst_rtmp_sink_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstRTMPSink, gst_rtmp_sink, GST_TYPE_BASE_SINK,
G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
gst_rtmp_sink_uri_handler_init));
/* initialize the plugin's class */
static void
gst_rtmp_sink_class_init (GstRTMPSinkClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
GstBaseSinkClass *gstbasesink_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
gstbasesink_class = (GstBaseSinkClass *) klass;
gobject_class->finalize = gst_rtmp_sink_finalize;
gobject_class->set_property = gst_rtmp_sink_set_property;
gobject_class->get_property = gst_rtmp_sink_get_property;
g_object_class_install_property (gobject_class, PROP_LOCATION,
g_param_spec_string ("location", "RTMP Location", "RTMP url",
DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_set_static_metadata (gstelement_class,
"RTMP output sink",
"Sink/Network", "Sends FLV content to a server via RTMP",
"Jan Schmidt <thaytan@noraisin.net>");
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sink_template));
gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp_sink_start);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp_sink_stop);
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp_sink_render);
gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp_sink_setcaps);
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp_sink_event);
GST_DEBUG_CATEGORY_INIT (gst_rtmp_sink_debug, "rtmpsink", 0,
"RTMP server element");
}
/* initialize the new element
* initialize instance structure
*/
static void
gst_rtmp_sink_init (GstRTMPSink * sink)
{
#ifdef G_OS_WIN32
WSADATA wsa_data;
if (WSAStartup (MAKEWORD (2, 2), &wsa_data) != 0) {
GST_ERROR_OBJECT (sink, "WSAStartup failed: 0x%08x", WSAGetLastError ());
}
#endif
}
static void
gst_rtmp_sink_finalize (GObject * object)
{
GstRTMPSink *sink = GST_RTMP_SINK (object);
#ifdef G_OS_WIN32
WSACleanup ();
#endif
g_free (sink->uri);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static gboolean
gst_rtmp_sink_start (GstBaseSink * basesink)
{
GstRTMPSink *sink = GST_RTMP_SINK (basesink);
if (!sink->uri) {
GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE,
("Please set URI for RTMP output"), ("No URI set before starting"));
return FALSE;
}
sink->rtmp_uri = g_strdup (sink->uri);
sink->rtmp = RTMP_Alloc ();
if (!sink->rtmp) {
GST_ERROR_OBJECT (sink, "Could not allocate librtmp's RTMP context");
goto error;
}
RTMP_Init (sink->rtmp);
if (!RTMP_SetupURL (sink->rtmp, sink->rtmp_uri)) {
GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL),
("Failed to setup URL '%s'", sink->uri));
goto error;
}
GST_DEBUG_OBJECT (sink, "Created RTMP object");
/* Mark this as an output connection */
RTMP_EnableWrite (sink->rtmp);
sink->first = TRUE;
sink->have_write_error = FALSE;
return TRUE;
error:
if (sink->rtmp) {
RTMP_Free (sink->rtmp);
sink->rtmp = NULL;
}
g_free (sink->rtmp_uri);
sink->rtmp_uri = NULL;
return FALSE;
}
static gboolean
gst_rtmp_sink_stop (GstBaseSink * basesink)
{
GstRTMPSink *sink = GST_RTMP_SINK (basesink);
if (sink->header) {
gst_buffer_unref (sink->header);
sink->header = NULL;
}
if (sink->rtmp) {
RTMP_Close (sink->rtmp);
RTMP_Free (sink->rtmp);
sink->rtmp = NULL;
}
if (sink->rtmp_uri) {
g_free (sink->rtmp_uri);
sink->rtmp_uri = NULL;
}
return TRUE;
}
static GstFlowReturn
gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
GstRTMPSink *sink = GST_RTMP_SINK (bsink);
gboolean need_unref = FALSE;
GstMapInfo map = GST_MAP_INFO_INIT;
if (sink->rtmp == NULL) {
/* Do not crash */
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), ("Failed to write data"));
return GST_FLOW_ERROR;
}
/* Ignore buffers that are in the stream headers (caps) */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
return GST_FLOW_OK;
}
if (sink->first) {
/* open the connection */
if (!RTMP_IsConnected (sink->rtmp)) {
if (!RTMP_Connect (sink->rtmp, NULL)
|| !RTMP_ConnectStream (sink->rtmp, 0)) {
GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL),
("Could not connect to RTMP stream \"%s\" for writing", sink->uri));
RTMP_Free (sink->rtmp);
sink->rtmp = NULL;
g_free (sink->rtmp_uri);
sink->rtmp_uri = NULL;
sink->have_write_error = TRUE;
return GST_FLOW_ERROR;
}
GST_DEBUG_OBJECT (sink, "Opened connection to %s", sink->rtmp_uri);
}
/* Prepend the header from the caps to the first non header buffer */
if (sink->header) {
buf = gst_buffer_append (gst_buffer_ref (sink->header),
gst_buffer_ref (buf));
need_unref = TRUE;
}
sink->first = FALSE;
}
if (sink->have_write_error)
goto write_failed;
GST_LOG_OBJECT (sink, "Sending %" G_GSIZE_FORMAT " bytes to RTMP server",
gst_buffer_get_size (buf));
gst_buffer_map (buf, &map, GST_MAP_READ);
if (RTMP_Write (sink->rtmp, (char *) map.data, map.size) <= 0)
goto write_failed;
gst_buffer_unmap (buf, &map);
if (need_unref)
gst_buffer_unref (buf);
return GST_FLOW_OK;
/* ERRORS */
write_failed:
{
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), ("Failed to write data"));
gst_buffer_unmap (buf, &map);
if (need_unref)
gst_buffer_unref (buf);
sink->have_write_error = TRUE;
return GST_FLOW_ERROR;
}
}
/*
* URI interface support.
*/
static GstURIType
gst_rtmp_sink_uri_get_type (GType type)
{
return GST_URI_SINK;
}
static const gchar *const *
gst_rtmp_sink_uri_get_protocols (GType type)
{
static const gchar *protocols[] =
{ "rtmp", "rtmpt", "rtmps", "rtmpe", "rtmfp", "rtmpte", "rtmpts", NULL };
return protocols;
}
static gchar *
gst_rtmp_sink_uri_get_uri (GstURIHandler * handler)
{
GstRTMPSink *sink = GST_RTMP_SINK (handler);
/* FIXME: make thread-safe */
return g_strdup (sink->uri);
}
static gboolean
gst_rtmp_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
GError ** error)
{
GstRTMPSink *sink = GST_RTMP_SINK (handler);
gboolean ret = TRUE;
if (GST_STATE (sink) >= GST_STATE_PAUSED) {
g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
"Changing the URI on rtmpsink when it is running is not supported");
return FALSE;
}
g_free (sink->uri);
sink->uri = NULL;
if (uri != NULL) {
int protocol;
AVal host;
unsigned int port;
AVal playpath, app;
if (!RTMP_ParseURL (uri, &protocol, &host, &port, &playpath, &app) ||
!host.av_len) {
GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE,
("Failed to parse URI %s", uri), (NULL));
g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
"Could not parse RTMP URI");
ret = FALSE;
} else {
sink->uri = g_strdup (uri);
}
if (playpath.av_val)
free (playpath.av_val);
}
if (ret) {
sink->have_write_error = FALSE;
GST_DEBUG_OBJECT (sink, "Changed URI to %s", GST_STR_NULL (uri));
}
return ret;
}
static void
gst_rtmp_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
iface->get_type = gst_rtmp_sink_uri_get_type;
iface->get_protocols = gst_rtmp_sink_uri_get_protocols;
iface->get_uri = gst_rtmp_sink_uri_get_uri;
iface->set_uri = gst_rtmp_sink_uri_set_uri;
}
static void
gst_rtmp_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstRTMPSink *sink = GST_RTMP_SINK (object);
switch (prop_id) {
case PROP_LOCATION:
gst_rtmp_sink_uri_set_uri (GST_URI_HANDLER (sink),
g_value_get_string (value), NULL);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static gboolean
gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps)
{
GstRTMPSink *rtmpsink = GST_RTMP_SINK (sink);
GstStructure *s;
const GValue *sh;
GArray *buffers;
gint i;
GST_DEBUG_OBJECT (sink, "caps set to %" GST_PTR_FORMAT, caps);
/* Clear our current header buffer */
if (rtmpsink->header) {
gst_buffer_unref (rtmpsink->header);
rtmpsink->header = NULL;
}
rtmpsink->header = gst_buffer_new ();
s = gst_caps_get_structure (caps, 0);
sh = gst_structure_get_value (s, "streamheader");
buffers = g_value_peek_pointer (sh);
/* Concatenate all buffers in streamheader into one */
for (i = 0; i < buffers->len; ++i) {
GValue *val;
GstBuffer *buf;
val = &g_array_index (buffers, GValue, i);
buf = g_value_peek_pointer (val);
gst_buffer_ref (buf);
rtmpsink->header = gst_buffer_append (rtmpsink->header, buf);
}
GST_DEBUG_OBJECT (rtmpsink, "have %" G_GSIZE_FORMAT " bytes of header data",
gst_buffer_get_size (rtmpsink->header));
return TRUE;
}
static gboolean
gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event)
{
GstRTMPSink *rtmpsink = GST_RTMP_SINK (sink);
switch (event->type) {
case GST_EVENT_FLUSH_STOP:
rtmpsink->have_write_error = FALSE;
break;
default:
break;
}
return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
}
static void
gst_rtmp_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstRTMPSink *sink = GST_RTMP_SINK (object);
switch (prop_id) {
case PROP_LOCATION:
g_value_set_string (value, sink->uri);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}